Main Page | Namespace List | Class Hierarchy | Class List | File List | Namespace Members | Class Members | File Members

Task.h

Go to the documentation of this file.
00001 // File: Task.h
00002 // Author: Francesco Giacomini <Francesco.Giacomini@cnaf.infn.it>
00003 // Copyright (c) 2001 EU DataGrid.
00004 // For license conditions see http://www.eu-datagrid.org/license.html
00005 
00006 // $Id: Task.h,v 1.2 2002/11/12 13:58:11 giaco Exp $
00007 
00008 #ifndef EDG_WORKLOAD_COMMON_TASK_TASK_H
00009 #define EDG_WORKLOAD_COMMON_TASK_TASK_H
00010 
00011 #include <deque>
00012 #include <cassert>
00013 #include <boost/function.hpp>
00014 #include <boost/thread/mutex.hpp>
00015 #include <boost/thread/condition.hpp>
00016 #include <boost/thread/thread.hpp>
00017 
00018 namespace edg {
00019 namespace workload {
00020 namespace common {
00021 namespace task {
00022 
00023 // exceptions
00024 struct Eof {};
00025 struct SigPipe {};
00026 struct Empty {};
00027 struct Full {};
00028 
00029 template<typename T> class Pipe;
00030 
00031 template<typename T>
00032 class PipeReadEnd {
00033 
00034   friend class Pipe<T>;
00035 
00036   Pipe<T>* m_parent;
00037 
00038 public:
00039   PipeReadEnd(Pipe<T>* parent = 0): m_parent(parent) {}
00040   PipeReadEnd(const PipeReadEnd<T>& rhs) : m_parent(rhs.m_parent) {}
00041   PipeReadEnd<T>& operator=(const PipeReadEnd<T>& rhs)
00042   {
00043     m_parent = rhs.m_parent;
00044     return *this;
00045   }
00046   ~PipeReadEnd() {}
00047   T read() { return m_parent->read(); }
00048   T try_read() { return m_parent->try_read(); }
00049   void open() { m_parent->open_read_end(); }
00050   void close() { m_parent->close_read_end(); }
00051 };
00052 
00053 template<typename T>
00054 class PipeWriteEnd {
00055 
00056   friend class Pipe<T>;
00057 
00058   Pipe<T>* m_parent;
00059 
00060 public:
00061   PipeWriteEnd(Pipe<T>* parent = 0): m_parent(parent) {}
00062   PipeWriteEnd(const PipeWriteEnd& rhs): m_parent(rhs.m_parent) {}
00063   PipeWriteEnd& operator=(const PipeWriteEnd& rhs)
00064   {
00065     m_parent = rhs.m_parent;
00066     return *this;
00067   }
00068   ~PipeWriteEnd() {}
00069 
00070   void write(T obj) { m_parent->write(obj); }
00071   void try_write(T obj) { m_parent->try_write(obj); }
00072   void open() { m_parent->open_write_end(); }
00073   void close() { m_parent->close_write_end(); }
00074 };
00075 
00076 // when a pipe is created it is open both for reading and for writing,
00077 // although there are no readers and writers at that moment
00078 // a pipe end (read or write) becomes closed when all the threads who had
00079 // previously opened it have then closed it
00080 template<typename T>
00081 class Pipe {
00082 
00083   boost::mutex m_mutex;       // control any access to the pipe
00084   std::deque<T> m_queue;
00085   size_t m_max_size;
00086   boost::condition m_not_full; // sync access to the queue
00087   boost::condition m_not_empty; // sync access to the queue
00088 
00089   int m_num_writers;
00090   int m_num_readers;
00091 
00092   bool m_write_end_is_closed;
00093   bool m_read_end_is_closed;
00094 
00095   // non-copyable
00096   Pipe(const Pipe& rhs);
00097   Pipe& operator=(const Pipe& rhs);
00098 
00099 public:
00100   Pipe(size_t max_size = 10)
00101     : m_max_size(max_size), m_num_writers(0), m_num_readers(0),
00102       m_write_end_is_closed(false), m_read_end_is_closed(false)
00103   {
00104   }
00105   ~Pipe() {}
00106 
00107   PipeReadEnd<T> read_end() { return PipeReadEnd<T>(this); }
00108   PipeWriteEnd<T> write_end() { return PipeWriteEnd<T>(this); }
00109 
00110   // before the first read is done m_num_writers > 0
00111   T read()
00112   {
00113     boost::mutex::scoped_lock lock(m_mutex);
00114 
00115     while (m_queue.empty()) {
00116       if (m_write_end_is_closed) { // see comment in close_write_end()
00117         throw Eof();
00118       }
00119       m_not_empty.wait(lock);
00120     }
00121     T result = m_queue.front();
00122     m_queue.pop_front();
00123     m_not_full.notify_one();
00124 
00125     return result;
00126   }
00127 
00128   T try_read()
00129   {
00130     boost::mutex::scoped_lock lock(m_mutex);
00131 
00132     if (m_queue.empty()) {
00133       if (m_write_end_is_closed) { // see comment in close_write_end()
00134         throw Eof();
00135       } else {
00136         throw Empty();
00137       }
00138     }
00139 
00140     T result = m_queue.front();
00141     m_queue.pop_front();
00142     m_not_full.notify_one();
00143 
00144     return result;
00145   }
00146 
00147   void write(const T& obj)
00148   {
00149     boost::mutex::scoped_lock lock(m_mutex);
00150 
00151     if (m_read_end_is_closed) {
00152       throw SigPipe();
00153     }
00154 
00155     while (m_queue.size() == m_max_size) {
00156       m_not_full.wait(lock);
00157       if (m_read_end_is_closed) { // see comment in close_read()
00158         throw SigPipe();
00159       }
00160     }
00161     m_queue.push_back(obj);
00162     m_not_empty.notify_one();
00163   }
00164 
00165   void try_write(const T& obj)
00166   {
00167     boost::mutex::scoped_lock lock(m_mutex);
00168 
00169     if (m_read_end_is_closed) {
00170       throw SigPipe();
00171     } else if (m_queue.size() == m_max_size) {
00172       throw Full();
00173     }
00174 
00175     m_queue.push_back(obj);
00176     m_not_empty.notify_one();
00177   }
00178 
00179   bool empty()
00180   {
00181     boost::mutex::scoped_lock lock(m_mutex);
00182 
00183     return m_queue.empty();
00184   }
00185 
00186   void open_read_end(int num = 1)
00187   {
00188     boost::mutex::scoped_lock lock(m_mutex);
00189 
00190     m_num_readers += num;
00191   }
00192 
00193   void open_write_end(int num = 1)
00194   {
00195     boost::mutex::scoped_lock lock(m_mutex);
00196 
00197     m_num_writers += num;
00198   }
00199 
00200   void close_read_end()
00201   {
00202     boost::mutex::scoped_lock lock(m_mutex);
00203 
00204     --m_num_readers;
00205     if (m_num_readers == 0) {
00206       // be careful when writing... (see comment in write())
00207       m_read_end_is_closed = true;
00208       // wake up all writers so that they can realize that they will never
00209       // be able to push data again into the pipe
00210       m_not_full.notify_all();
00211     }
00212   }
00213 
00214   void close_write_end()
00215   {
00216     boost::mutex::scoped_lock lock(m_mutex);
00217 
00218     --m_num_writers;
00219     if (m_num_writers == 0) {
00220       // be careful when reading... (see comment in read())
00221       m_write_end_is_closed = true;
00222       // wake up all readers so that they can realize that no other data
00223       // will ever appear on the pipe
00224       m_not_empty.notify_all();
00225     }
00226   }
00227 
00228 };
00229 
00230 template<typename T_IN>
00231 class PipeReader
00232 {
00233   PipeReadEnd<T_IN> m_read_end;
00234 
00235   friend class Task;
00236   void read_from(const PipeReadEnd<T_IN>& from) { m_read_end = from; }
00237 
00238 protected:
00239   PipeReader() {}
00240   PipeReadEnd<T_IN>& read_end() { return m_read_end; }
00241 
00242 public:
00243   virtual ~PipeReader() {}
00244   virtual void run() = 0;
00245 };
00246 
00247 
00248 template<typename T_OUT>
00249 class PipeWriter
00250 {
00251   PipeWriteEnd<T_OUT> m_write_end;
00252 
00253   friend class Task;
00254   void write_to(const PipeWriteEnd<T_OUT>& to) { m_write_end = to; }
00255 
00256 protected:
00257   PipeWriter() {}
00258   PipeWriteEnd<T_OUT>& write_end() { return m_write_end; }
00259 
00260 public:
00261   virtual ~PipeWriter() {}
00262   virtual void run() = 0;
00263 };
00264 
00265 template<typename T_IN, typename T_OUT>
00266 class PipeForwarder
00267 {
00268   PipeReadEnd<T_IN> m_read_end;
00269   PipeWriteEnd<T_OUT> m_write_end;
00270 
00271   friend class Task;
00272   void read_from(const PipeReadEnd<T_IN>& from) { m_read_end = from; }
00273   void write_to(const PipeWriteEnd<T_OUT>& to) { m_write_end = to; }
00274 
00275 protected:
00276   PipeForwarder() {}
00277   PipeReadEnd<T_IN>& read_end() { return m_read_end; }
00278   PipeWriteEnd<T_OUT>& write_end() { return m_write_end; }
00279 
00280 public:
00281   virtual ~PipeForwarder() {}
00282   virtual void run() = 0;
00283 };
00284 
00285 // aux class for "Resource acquisition is initialization"
00286 template<typename E>
00287 struct Access
00288 {
00289   E& m_e;
00290   Access(E& e): m_e(e) { m_e.open(); }
00291   ~Access() { m_e.close(); }
00292 };
00293  
00294 // aux class to guarantee to close a pipe end at the end of the thread function
00295 template<typename E>
00296 struct CloseOnExit
00297 {
00298   E& m_e;
00299   CloseOnExit(E& e): m_e(e) {}
00300   ~CloseOnExit() { m_e.close(); }
00301 };
00302  
00303 template<typename T_IN>
00304 class ReaderFunctor
00305 {
00306   PipeReader<T_IN>& m_runner;
00307   PipeReadEnd<T_IN> m_read_end;
00308 
00309 public:
00310   ReaderFunctor(PipeReader<T_IN>& runner, PipeReadEnd<T_IN> end)
00311     : m_runner(runner), m_read_end(end)
00312   {}
00313 
00314   void operator()()
00315   {
00316     CloseOnExit< PipeReadEnd<T_IN> > a(m_read_end);
00317 
00318     m_runner.run();
00319   }
00320 };
00321 
00322 template<typename T_OUT>
00323 class WriterFunctor
00324 {
00325   PipeWriter<T_OUT>& m_runner;
00326   PipeWriteEnd<T_OUT> m_write_end;
00327 
00328 public:
00329   WriterFunctor(PipeWriter<T_OUT>& runner, PipeWriteEnd<T_OUT> end)
00330     : m_runner(runner), m_write_end(end)
00331   {}
00332 
00333   void operator()()
00334   {
00335     CloseOnExit< PipeWriteEnd<T_OUT> > a(m_write_end);
00336 
00337     m_runner.run();
00338   }
00339 };
00340 
00341 template<typename T_IN, typename T_OUT>
00342 class ForwarderFunctor
00343 {
00344   PipeForwarder<T_IN, T_OUT>& m_runner;
00345   PipeReadEnd<T_IN> m_read_end;
00346   PipeWriteEnd<T_OUT> m_write_end;
00347 
00348 public:
00349   ForwarderFunctor(PipeForwarder<T_IN, T_OUT>& runner,
00350                    PipeReadEnd<T_IN> read_end,
00351                    PipeWriteEnd<T_OUT> write_end)
00352     : m_runner(runner), m_read_end(read_end), m_write_end(write_end)
00353   {}
00354 
00355   void operator()()
00356   {
00357     CloseOnExit< PipeReadEnd<T_IN> > r(m_read_end);
00358     CloseOnExit< PipeWriteEnd<T_OUT> > w(m_write_end);
00359 
00360     m_runner.run();
00361   }
00362 };
00363 
00364 class Task
00365 {
00366   boost::thread_group* m_group;
00367 
00368   // non-copyable
00369   Task(const Task& rhs);
00370   Task& operator=(const Task& rhs);
00371 
00372 public:
00373 
00374   Task(const boost::function0<void>& fun, int num_threads = 1)
00375     : m_group(new boost::thread_group)
00376   {
00377     assert(num_threads > 0);
00378 
00379     while (num_threads--) {
00380       m_group->create_thread(fun);
00381     }
00382   }
00383 
00384   template<typename T>
00385   Task(PipeReader<T>& runner, Pipe<T>& pipe, int num_threads = 1)
00386     : m_group(new boost::thread_group)
00387   {
00388     assert(num_threads > 0);
00389 
00390     runner.read_from(pipe.read_end());
00391     ReaderFunctor<T> f(runner, pipe.read_end());
00392     pipe.open_read_end(num_threads);
00393     while (num_threads--) {
00394       m_group->create_thread(f);
00395     }
00396   }
00397 
00398   template<typename T>
00399   Task(PipeWriter<T>& runner, Pipe<T>& pipe, int num_threads = 1)
00400     : m_group(new boost::thread_group)
00401   {
00402     assert(num_threads > 0);
00403 
00404     runner.write_to(pipe.write_end());
00405     WriterFunctor<T> f(runner, pipe.write_end());
00406     pipe.open_write_end(num_threads);
00407     while (num_threads--) {
00408       m_group->create_thread(f);
00409     }
00410   }
00411 
00412   template<typename T_IN, typename T_OUT>
00413   Task(PipeForwarder<T_IN, T_OUT>& runner,
00414        Pipe<T_IN>& pipe_in, Pipe<T_OUT>& pipe_out,
00415        int num_threads = 1)
00416     : m_group(new boost::thread_group)
00417   {
00418     assert(num_threads > 0);
00419 
00420     runner.read_from(pipe_in.read_end());
00421     pipe_in.open_read_end(num_threads);
00422     runner.write_to(pipe_out.write_end());
00423     pipe_out.open_write_end(num_threads);
00424     ForwarderFunctor<T_IN, T_OUT> f(runner, pipe_in.read_end(), pipe_out.write_end());
00425     while (num_threads--) {
00426       m_group->create_thread(f);
00427     }      
00428   }
00429 
00430   ~Task()
00431   {
00432     m_group->join_all();
00433     delete m_group;
00434   }
00435 
00436 };
00437 
00438 } // namespace task
00439 } // namespace common
00440 } // namespace workload
00441 } // namespace edg
00442 
00443 #endif // EDG_WORKLOAD_COMMON_TASK_TASK_H

Generated on Wed Mar 1 00:37:55 2006 for COMMON API - configuration, jobid, ldif2classadi, logger, process, requestad, socket++i, task, utilities by doxygen 1.3.5