PipeLine.hpp
00001 //
00002 // 3D Shape Analysis Library.
00003 // Copyright (C) 2010 AQSENSE, S.L.
00004 //
00005 #if !defined (AQSENSE_SAL3DPP_PIPELINE_HPP)
00006 #define AQSENSE_SAL3DPP_PIPELINE_HPP
00007 
00008 #if defined (_MSC_VER) && (_MSC_VER >= 1020)
00009 #pragma once
00010 #endif // _MSC_VER && _MSC_VER >= 1020
00011 
00012 #include <cassert>
00013 #include <stdexcept>
00014 #include <string>
00015 #include <sal3d/threadpool.h>
00016 #include <sal3dpp/Common.hpp>
00017 
00018 namespace sal3d
00019 {
00118     template<typename Task>
00119     class PipeLine
00120     {
00121         public:
00129             PipeLine(int nthreads)
00130                 :_nthreads(nthreads)
00131             {
00132                 Error e;
00133                 sal3d_threadpool_new(nthreads, &_threadpool, 0, &e.e);
00134                 if (e.e.value < 0)
00135                     throw e;
00136             } 
00137 
00145             PipeLine(int nthreads, const char *basename)
00146                 :_nthreads(nthreads)
00147             {
00148                 Error e;
00149                 sal3d_threadpool_new(nthreads, &_threadpool, basename, &e.e);
00150                 if (e.e.value < 0)
00151                     throw e;
00152             } 
00153 
00158             ~PipeLine()
00159             {
00160                 sal3d_threadpool_delete(&_threadpool, 0);
00161             }
00162 
00174             void post(const Task &t)
00175             {
00176                 // This will be deleted after running, on callback.
00177                 Task *copy = new Task(t);
00178 
00179                 Error e;
00180                 sal3d_threadpool_post_sequential(_threadpool,
00181                         callbackParallel,
00182                         reinterpret_cast<void *>(copy),
00183                         callbackSequential,
00184                         reinterpret_cast<void *>(copy),
00185                         &e.e
00186                         );
00187                 if (e.e.value < 0)
00188                 {
00189                     delete copy;
00190                     throw e;
00191                 }
00192             }
00193 
00198             void wait()
00199             {
00200                 Error e;
00201                 sal3d_threadpool_wait(_threadpool, &e.e);
00202                 if (e.e.value < 0)
00203                     throw e;
00204             }
00205 
00213             bool wait(int timeout_ms)
00214             {
00215                 Error e;
00216                 sal3d_threadpool_wait_timeout(_threadpool,
00217                         timeout_ms, &e.e);
00218                 if (e.e.value < 0)
00219                 {
00220                     if (e.e.value == SAL3D_ERROR_TIMEOUT)
00221                         return false;
00222                     throw e;
00223                 }
00224                 return true;
00225             }
00226 
00231             int nthreads() const
00232             {
00233                 return _nthreads;
00234             }
00235 
00236         private:
00237 
00238             static void callbackParallel(void *data)
00239             {
00240                 Task *t = reinterpret_cast<Task *>(data);
00241                 try {
00242                     t->parallel();
00243                 }
00244                 catch(...)
00245                 {
00246                     assert(0 && "Exception thrown in callback for Task");
00247                 }
00248             }
00249 
00250             static void callbackSequential(void *data)
00251             {
00252                 Task *t = reinterpret_cast<Task *>(data);
00253                 try {
00254                     t->sequential();
00255                 }
00256                 catch(...)
00257                 {
00258                     assert(0 && "Exception thrown in callback for Task");
00259                 }
00260 
00261                 delete t;
00262             }
00263 
00264             /* Uncopiable */
00265             PipeLine(const PipeLine &);
00266             PipeLine& operator=(const PipeLine &);
00267 
00268             sal3d_threadpool _threadpool;
00269             int _nthreads;
00270     };
00271 }
00272 #endif // !AQSENSE_SAL3DPP_PIPELINE_HPP