00001
00002
00003
00004
00005 #if !defined (AQSENSE_SAL3DPP_PIPELINE_HPP)
00006 #define AQSENSE_SAL3DPP_PIPELINE_HPP
00007
00008 #if defined (_MSC_VER) && (_MSC_VER >= 1020)
00009 #pragma once
00010 #endif // _MSC_VER && _MSC_VER >= 1020
00011
00012 #include <cassert>
00013 #include <stdexcept>
00014 #include <string>
00015 #include <sal3d/threadpool.h>
00016 #include <sal3dpp/Common.hpp>
00017
00018 namespace sal3d
00019 {
00118 template<typename Task>
00119 class PipeLine
00120 {
00121 public:
00129 PipeLine(int nthreads)
00130 :_nthreads(nthreads)
00131 {
00132 Error e;
00133 sal3d_threadpool_new(nthreads, &_threadpool, 0, &e.e);
00134 if (e.e.value < 0)
00135 throw e;
00136 }
00137
00145 PipeLine(int nthreads, const char *basename)
00146 :_nthreads(nthreads)
00147 {
00148 Error e;
00149 sal3d_threadpool_new(nthreads, &_threadpool, basename, &e.e);
00150 if (e.e.value < 0)
00151 throw e;
00152 }
00153
00158 ~PipeLine()
00159 {
00160 sal3d_threadpool_delete(&_threadpool, 0);
00161 }
00162
00174 void post(const Task &t)
00175 {
00176
00177 Task *copy = new Task(t);
00178
00179 Error e;
00180 sal3d_threadpool_post_sequential(_threadpool,
00181 callbackParallel,
00182 reinterpret_cast<void *>(copy),
00183 callbackSequential,
00184 reinterpret_cast<void *>(copy),
00185 &e.e
00186 );
00187 if (e.e.value < 0)
00188 {
00189 delete copy;
00190 throw e;
00191 }
00192 }
00193
00198 void wait()
00199 {
00200 Error e;
00201 sal3d_threadpool_wait(_threadpool, &e.e);
00202 if (e.e.value < 0)
00203 throw e;
00204 }
00205
00213 bool wait(int timeout_ms)
00214 {
00215 Error e;
00216 sal3d_threadpool_wait_timeout(_threadpool,
00217 timeout_ms, &e.e);
00218 if (e.e.value < 0)
00219 {
00220 if (e.e.value == SAL3D_ERROR_TIMEOUT)
00221 return false;
00222 throw e;
00223 }
00224 return true;
00225 }
00226
00231 int nthreads() const
00232 {
00233 return _nthreads;
00234 }
00235
00236 private:
00237
00238 static void callbackParallel(void *data)
00239 {
00240 Task *t = reinterpret_cast<Task *>(data);
00241 try {
00242 t->parallel();
00243 }
00244 catch(...)
00245 {
00246 assert(0 && "Exception thrown in callback for Task");
00247 }
00248 }
00249
00250 static void callbackSequential(void *data)
00251 {
00252 Task *t = reinterpret_cast<Task *>(data);
00253 try {
00254 t->sequential();
00255 }
00256 catch(...)
00257 {
00258 assert(0 && "Exception thrown in callback for Task");
00259 }
00260
00261 delete t;
00262 }
00263
00264
00265 PipeLine(const PipeLine &);
00266 PipeLine& operator=(const PipeLine &);
00267
00268 sal3d_threadpool _threadpool;
00269 int _nthreads;
00270 };
00271 }
00272 #endif // !AQSENSE_SAL3DPP_PIPELINE_HPP