使用pthread下的mutex与cond_var模拟windows下的event几个接口
两个版本的链接:
https://github.com/neosmart/pevents
https://github.com/moya-lang/Event
第一个版本能够模拟等待多个事件中的一个触发,而后者仅最多支持一个事件
但第一个版本在UnlockedWaitForEvent执行后,是需要增加一个判断的,否则会不正确
代码:
#if 0 //#define PULSE//#define WFMO #include<assert.h>#include<errno.h>#include<pthread.h>#include<sys/time.h>#ifdef WFMO
#include<algorithm>#include<deque> #endif structSyncObjectPosix_event_st;//Function declarations void* CreateEvent(void* lpEventAttributes = nullptr, bool manualReset = false, bool initialState = false, void* lpName =nullptr);int DestroyEvent(void* event);//int WaitForEvent(void* event, uint64_t milliseconds = -1); int WaitForSingleObject(void* event, uint64_t milliseconds = -1);int SetEvent(void* event);int ResetEvent(void* event);
#ifdef WFMOint WaitForMultipleObjects(int nCount,const void* *lpHandles, boolbWaitAll, uint64_t milliseconds);int WaitForMultipleEvents(void* *events, int count, boolwaitAll,
uint64_t milliseconds);int WaitForMultipleEvents(void* *events, int count, boolwaitAll,
uint64_t milliseconds,int &index);#endif#ifdef PULSEint PulseEvent(void* event);#endif#ifdef WFMO//Each call to WaitForMultipleObjects initializes a neosmart_wfmo_t object which tracks//the progress of the caller's multi-object wait and dispatches responses accordingly.//One neosmart_wfmo_t struct is shared for all events in a single WFMO call typedef structSyncObjectPosix_wfmo_st {
pthread_mutex_t Mutex;
pthread_cond_t CVariable;intRefCount;
union {int FiredEvent; //WFSO int EventsLeft; //WFMO } Status;boolWaitAll;boolStillWaiting;voidDestroy() {
pthread_mutex_destroy(&Mutex);
pthread_cond_destroy(&CVariable);
}
}SyncObjectPosix_wfmo_st;//typedef SyncObjectPosix_wfmo_t_ *SyncObjectPosix_wfmo_t;//A neosmart_wfmo_info_t object is registered with each event waited on in a WFMO//This reference to neosmart_wfmo_t_ is how the event knows whom to notify when triggered typedef structSyncObjectPosix_wfmo_info_st {
SyncObjectPosix_wfmo_st*Waiter;intWaitIndex;
}SyncObjectPosix_wfmo_info_st;//typedef SyncObjectPosix_wfmo_info_t_ *nSyncObjectPosix_wfmo_info_t; #endif //WFMO //The basic event structure, passed to the caller as an opaque pointer when creating events typedef structSyncObjectPosix_event_st {
pthread_cond_t CVariable;
pthread_mutex_t Mutex;boolAutoReset;boolState;
#ifdef WFMO
std::deque<SyncObjectPosix_wfmo_info_st>RegisteredWaits;#endif}SyncObjectPosix_event_st;
#ifdef WFMOboolRemoveExpiredWaitHelper(SyncObjectPosix_wfmo_info_st wait) {int result = pthread_mutex_trylock(&wait.Waiter->Mutex);if (result ==EBUSY) {return false;
}
assert(result== 0);if (wait.Waiter->StillWaiting == false) {--wait.Waiter->RefCount;
assert(wait.Waiter->RefCount >= 0);bool destroy = wait.Waiter->RefCount == 0;
result= pthread_mutex_unlock(&wait.Waiter->Mutex);
assert(result== 0);if(destroy) {
wait.Waiter->Destroy();deletewait.Waiter;
}return true;
}
result= pthread_mutex_unlock(&wait.Waiter->Mutex);
assert(result== 0);return false;
}#endif //WFMO void* CreateEvent(void* lpEventAttributes, bool manualReset, bool initialState, void*lpName) {
SyncObjectPosix_event_st* event = newSyncObjectPosix_event_st;int result = pthread_cond_init(&event->CVariable, 0);
assert(result== 0);
result= pthread_mutex_init(&event->Mutex, 0);
assert(result== 0);event->State = false;event->AutoReset = !manualReset;if(initialState) {
result= SetEvent(event);
assert(result== 0);
}return event;
}int UnlockedWaitForEvent(void* event, uint64_t milliseconds) {int result = 0;if (!((SyncObjectPosix_event_st*)event)->State) {//Zero-timeout event state check optimization if (milliseconds == 0) {returnWAIT_TIMEOUT;
}
timespec ts;if (milliseconds != (uint64_t)-1) {
timeval tv;
gettimeofday(&tv, NULL);
uint64_t nanoseconds= ((uint64_t)tv.tv_sec) * 1000 * 1000 * 1000 +milliseconds* 1000 * 1000 + ((uint64_t)tv.tv_usec) * 1000;
ts.tv_sec= nanoseconds / 1000 / 1000 / 1000;
ts.tv_nsec= (nanoseconds - ((uint64_t)ts.tv_sec) * 1000 * 1000 * 1000);
}do{//Regardless of whether it's an auto-reset or manual-reset event://wait to obtain the event, then lock anyone else out if (milliseconds != (uint64_t)-1) {
result= pthread_cond_timedwait(&((SyncObjectPosix_event_st*)event)->CVariable, &((SyncObjectPosix_event_st*)event)->Mutex, &ts);
}else{
result= pthread_cond_wait(&((SyncObjectPosix_event_st*)event)->CVariable, &((SyncObjectPosix_event_st*)event)->Mutex);
}
}while (result == 0 && !((SyncObjectPosix_event_st*)event)->State);if (result == 0 && ((SyncObjectPosix_event_st*)event)->AutoReset) {//We've only accquired the event if the wait succeeded ((SyncObjectPosix_event_st*)event)->State = false;
}
}else if (((SyncObjectPosix_event_st*)event)->AutoReset) {//It's an auto-reset event that's currently available;//we need to stop anyone else from using it result = 0;
((SyncObjectPosix_event_st*)event)->State = false;
}//Else we're trying to obtain a manual reset event with a signaled state;//don't do anything returnresult;
}int WaitForSingleObject(void* event, uint64_t milliseconds) {inttempResult;if (milliseconds == 0) {
tempResult= pthread_mutex_trylock(&((SyncObjectPosix_event_st*)event)->Mutex);if (tempResult ==EBUSY) {returnWAIT_TIMEOUT;
}
}else{
tempResult= pthread_mutex_lock(&((SyncObjectPosix_event_st*)event)->Mutex);
}
assert(tempResult== 0);int result = UnlockedWaitForEvent(((SyncObjectPosix_event_st*)event), milliseconds);
tempResult= pthread_mutex_unlock(&((SyncObjectPosix_event_st*)event)->Mutex);
assert(tempResult== 0);if (result ==ETIMEDOUT) {returnWAIT_TIMEOUT;
}returnresult;
}
#ifdef WFMOint WaitForMultipleEvents(void* *events, int count, boolwaitAll,
uint64_t milliseconds) {intunused;returnWaitForMultipleEvents(events, count, waitAll, milliseconds, unused);
}int WaitForMultipleEvents(void* *events, int count, boolwaitAll,
uint64_t milliseconds,int &waitIndex) {
SyncObjectPosix_wfmo_st* wfmo = newSyncObjectPosix_wfmo_st;
SyncObjectPosix_event_st** pp_events = (SyncObjectPosix_event_st**)events;int result = 0;int tempResult = pthread_mutex_init(&wfmo->Mutex, 0);
assert(tempResult== 0);
tempResult= pthread_cond_init(&wfmo->CVariable, 0);
assert(tempResult== 0);
SyncObjectPosix_wfmo_info_st waitInfo;
waitInfo.Waiter=wfmo;
waitInfo.WaitIndex= -1;
wfmo->WaitAll =waitAll;
wfmo->StillWaiting = true;
wfmo->RefCount = 1;if(waitAll) {
wfmo->Status.EventsLeft =count;
}else{
wfmo->Status.FiredEvent = -1;
}
tempResult= pthread_mutex_lock(&wfmo->Mutex);
assert(tempResult== 0);bool done = false;
waitIndex= -1;for (int i = 0; i < count; ++i) {
waitInfo.WaitIndex=i;//Must not release lock until RegisteredWait is potentially added tempResult = pthread_mutex_lock(&pp_events[i]->Mutex);
assert(tempResult== 0);//Before adding this wait to the list of registered waits, let's clean up old, expired//waits while we have the event lock anyway pp_events[i]->RegisteredWaits.erase(std::remove_if(pp_events[i]->RegisteredWaits.begin(),
pp_events[i]->RegisteredWaits.end(),
RemoveExpiredWaitHelper),
pp_events[i]->RegisteredWaits.end());if (UnlockedWaitForEvent(events[i], 0) == 0) {
tempResult= pthread_mutex_unlock(&pp_events[i]->Mutex);
assert(tempResult== 0);if(waitAll) {--wfmo->Status.EventsLeft;
assert(wfmo->Status.EventsLeft >= 0);
}else{
wfmo->Status.FiredEvent =i;
waitIndex=i;
done= true;break;
}
}else{
pp_events[i]->RegisteredWaits.push_back(waitInfo);++wfmo->RefCount;
tempResult= pthread_mutex_unlock(&pp_events[i]->Mutex);
assert(tempResult== 0);
}
}//We set the `done` flag above in case of WaitAny and at least one event was set.//But we need to check again here if we were doing a WaitAll or else we'll incorrectly//return WAIT_TIMEOUT. if (waitAll && wfmo->Status.EventsLeft == 0) {
done= true;
}
timespec ts;if (!done) {if (milliseconds == 0) {
result=WAIT_TIMEOUT;
done= true;
}else if (milliseconds != (uint64_t)-1) {
timeval tv;
gettimeofday(&tv, NULL);
uint64_t nanoseconds= ((uint64_t)tv.tv_sec) * 1000 * 1000 * 1000 +milliseconds* 1000 * 1000 + ((uint64_t)tv.tv_usec) * 1000;
ts.tv_sec= nanoseconds / 1000 / 1000 / 1000;
ts.tv_nsec= (nanoseconds - ((uint64_t)ts.tv_sec) * 1000 * 1000 * 1000);
}
}while (!done) {//One (or more) of the events we're monitoring has been triggered?//If we're waiting for all events, assume we're done and check if there's an event that//hasn't fired But if we're waiting for just one event, assume we're not done until we//find a fired event done = (waitAll && wfmo->Status.EventsLeft == 0) ||(!waitAll && wfmo->Status.FiredEvent != -1);if (!done) {if (milliseconds != (uint64_t)-1) {
result= pthread_cond_timedwait(&wfmo->CVariable, &wfmo->Mutex, &ts);
}else{
result= pthread_cond_wait(&wfmo->CVariable, &wfmo->Mutex);
}if (result != 0) {break;
}
}
}
waitIndex= wfmo->Status.FiredEvent;
wfmo->StillWaiting = false;--wfmo->RefCount;
assert(wfmo->RefCount >= 0);bool destroy = wfmo->RefCount == 0;
tempResult= pthread_mutex_unlock(&wfmo->Mutex);
assert(tempResult== 0);if(destroy) {
wfmo->Destroy();deletewfmo;
}returnresult;
}#endif //WFMO int CloseHandle(void* event) {return DestroyEvent(event);
}int DestroyEvent(void* event) {int result = 0;
#ifdef WFMO
result= pthread_mutex_lock(&((SyncObjectPosix_event_st*)event)->Mutex);
assert(result== 0);
((SyncObjectPosix_event_st*)event)->RegisteredWaits.erase(std::remove_if(((SyncObjectPosix_event_st*)event)->RegisteredWaits.begin(),
((SyncObjectPosix_event_st*)event)->RegisteredWaits.end(),
RemoveExpiredWaitHelper),
((SyncObjectPosix_event_st*)event)->RegisteredWaits.end());
result= pthread_mutex_unlock(&((SyncObjectPosix_event_st*)event)->Mutex);
assert(result== 0);#endifresult= pthread_cond_destroy(&((SyncObjectPosix_event_st*)event)->CVariable);
assert(result== 0);
result= pthread_mutex_destroy(&((SyncObjectPosix_event_st*)event)->Mutex);
assert(result== 0);delete ((SyncObjectPosix_event_st*)event);return 0;
}int SetEvent(void* event) {int result = pthread_mutex_lock(&((SyncObjectPosix_event_st*)event)->Mutex);
assert(result== 0);
((SyncObjectPosix_event_st*)event)->State = true;//Depending on the event type, we either trigger everyone or only one if (((SyncObjectPosix_event_st*)event)->AutoReset) {
#ifdef WFMOwhile (!((SyncObjectPosix_event_st*)event)->RegisteredWaits.empty()) {
SyncObjectPosix_wfmo_info_st* i = &((SyncObjectPosix_event_st*)event)->RegisteredWaits.front();
result= pthread_mutex_lock(&i->Waiter->Mutex);
assert(result== 0);--i->Waiter->RefCount;
assert(i->Waiter->RefCount >= 0);if (!i->Waiter->StillWaiting) {bool destroy = i->Waiter->RefCount == 0;
result= pthread_mutex_unlock(&i->Waiter->Mutex);
assert(result== 0);if(destroy) {
i->Waiter->Destroy();delete i->Waiter;
}
((SyncObjectPosix_event_st*)event)->RegisteredWaits.pop_front();continue;
}
((SyncObjectPosix_event_st*)event)->State = false;if (i->Waiter->WaitAll) {--i->Waiter->Status.EventsLeft;
assert(i->Waiter->Status.EventsLeft >= 0);//We technically should do i->Waiter->StillWaiting = Waiter->Status.EventsLeft//!= 0 but the only time it'll be equal to zero is if we're the last event, so//no one else will be checking the StillWaiting flag. We're good to go without//it. } else{
i->Waiter->Status.FiredEvent = i->WaitIndex;
i->Waiter->StillWaiting = false;
}
result= pthread_mutex_unlock(&i->Waiter->Mutex);
assert(result== 0);
result= pthread_cond_signal(&i->Waiter->CVariable);
assert(result== 0);
((SyncObjectPosix_event_st*)event)->RegisteredWaits.pop_front();
result= pthread_mutex_unlock(&((SyncObjectPosix_event_st*)event)->Mutex);
assert(result== 0);return 0;
}#endif //WFMO //event->State can be false if compiled with WFMO support if (((SyncObjectPosix_event_st*)event)->State) {
result= pthread_mutex_unlock(&((SyncObjectPosix_event_st*)event)->Mutex);
assert(result== 0);
result= pthread_cond_signal(&((SyncObjectPosix_event_st*)event)->CVariable);
assert(result== 0);return 0;
}
}else{
#ifdef WFMOfor (size_t i = 0; i < ((SyncObjectPosix_event_st*)event)->RegisteredWaits.size(); ++i) {
SyncObjectPosix_wfmo_info_st* info = &((SyncObjectPosix_event_st*)event)->RegisteredWaits[i];
result= pthread_mutex_lock(&info->Waiter->Mutex);
assert(result== 0);--info->Waiter->RefCount;
assert(info->Waiter->RefCount >= 0);if (!info->Waiter->StillWaiting) {bool destroy = info->Waiter->RefCount == 0;
result= pthread_mutex_unlock(&info->Waiter->Mutex);
assert(result== 0);if(destroy) {
info->Waiter->Destroy();delete info->Waiter;
}continue;
}if (info->Waiter->WaitAll) {--info->Waiter->Status.EventsLeft;
assert(info->Waiter->Status.EventsLeft >= 0);//We technically should do i->Waiter->StillWaiting = Waiter->Status.EventsLeft//!= 0 but the only time it'll be equal to zero is if we're the last event, so//no one else will be checking the StillWaiting flag. We're good to go without//it. } else{
info->Waiter->Status.FiredEvent = info->WaitIndex;
info->Waiter->StillWaiting = false;
}
result= pthread_mutex_unlock(&info->Waiter->Mutex);
assert(result== 0);
result= pthread_cond_signal(&info->Waiter->CVariable);
assert(result== 0);
}
((SyncObjectPosix_event_st*)event)->RegisteredWaits.clear();#endif //WFMOresult= pthread_mutex_unlock(&((SyncObjectPosix_event_st*)event)->Mutex);
assert(result== 0);
result= pthread_cond_broadcast(&((SyncObjectPosix_event_st*)event)->CVariable);
assert(result== 0);
}return 0;
}int ResetEvent(void* event) {int result = pthread_mutex_lock(&((SyncObjectPosix_event_st*)event)->Mutex);
assert(result== 0);
((SyncObjectPosix_event_st*)event)->State = false;
result= pthread_mutex_unlock(&((SyncObjectPosix_event_st*)event)->Mutex);
assert(result== 0);return 0;
}
#ifdef PULSEint PulseEvent(void* event) {//This may look like it's a horribly inefficient kludge with the sole intention of reducing//code duplication, but in reality this is what any PulseEvent() implementation must look//like. The only overhead (function calls aside, which your compiler will likely optimize//away, anyway), is if only WFMO auto-reset waits are active there will be overhead to//unnecessarily obtain the event mutex for ResetEvent() after. In all other cases (being no//pending waits, WFMO manual-reset waits, or any WFSO waits), the event mutex must first be//released for the waiting thread to resume action prior to locking the mutex again in//order to set the event state to unsignaled, or else the waiting threads will loop back//into a wait (due to checks for spurious CVariable wakeups). int result = SetEvent(event);
assert(result== 0);
result= ResetEvent(event);
assert(result== 0);return 0;
}#endif #else#include<mutex>#include<condition_variable>#include<chrono>#include<functional> classSyncObjectPosix
{public:
SyncObjectPosix(bool initial, boolmanual) :
state(initial), manual(manual)
{
}void change(boolmanual)
{
std::unique_lock<std::mutex> lock(mutex);this->manual =manual;
}void set()
{
std::unique_lock<std::mutex> lock(mutex);if(state)return;
state= true;if(manual)
condition.notify_all();elsecondition.notify_one();
}voidreset()
{
std::unique_lock<std::mutex> lock(mutex);
state= false;
}voidwait()
{
std::unique_lock<std::mutex> lock(mutex);
condition.wait(lock, [this] { returnstate; });if (!manual)
state= false;
}
template<class Rep, class Period> int wait(const std::chrono::duration<Rep, Period> &timeout)
{
std::unique_lock<std::mutex> lock(mutex);if (!condition.wait_for(lock, timeout, [this] {returnstate;} )) {returnWAIT_TIMEOUT;
}//return; if (!manual)
state= false;return 0;
}private:boolreturn_state() {returnstate;
};
std::mutex mutex;
std::condition_variable condition;boolstate, manual;
};
inlinevoid* CreateEvent(void* lpEventAttributes, BOOL bManualReset, BOOL bInitialState, void*lpName){return (void*)(newSyncObjectPosix(bInitialState, bManualReset));
}
inlinevoid CloseHandle(void*p_this) {delete (SyncObjectPosix*)p_this;
}
inlineint WaitForSingleObject(void* p_this, uint64_t milliseconds = -1){
SyncObjectPosix* event_this = (SyncObjectPosix*)p_this;return event_this->wait(std::chrono::milliseconds(milliseconds));
}
inlineint SetEvent(void*p_this){
SyncObjectPosix* event_this = (SyncObjectPosix*)p_this;
event_this->set();return 0;
}
inlineint ResetEvent(void*p_this){
SyncObjectPosix* event_this = (SyncObjectPosix*)p_this;
event_this->reset();return 0;
}#endif