dmlite  0.6
TaskExec.h
Go to the documentation of this file.
1 /*
2  * Copyright 2015 CERN
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  */
17 #ifndef DMLITE_TASKEXEC_H
18 #define DMLITE_TASKEXEC_H
19 
20 /** @file dmTaskExec.h
21  * @brief A class that spawns commands that perform actions
22  * @author Fabrizio Furano
23  * @date Dec 2015
24  */
25 
26 
27 #include <boost/thread.hpp>
28 #include <signal.h>
29 #include <vector>
30 #include <string>
31 #include <algorithm>
32 #include <sstream>
33 #include <iterator>
34 #include <iostream>
36 
37 namespace dmlite {
38  class dmTaskExec;
39 
40  class dmTask: public boost::mutex {
41 
42  protected:
43  /// Threads waiting for result about this task will wait and synchronize here
44  /// using something like
45  /// boost::lock_guard< boost::mutex > l(workmutex);
46  ///
47  boost::condition_variable condvar;
48  public:
49  dmTask(dmTaskExec *wheretolog);
50  dmTask(const dmTask &o) {
51  key = o.key;
52  cmd = o.cmd;
53  for(unsigned int i = 0; i < 64; i++) parms[i] = NULL;
55  starttime = o.starttime;
56  endtime = o.endtime;
57  finished = o.finished;
58  fd[0] = 0; fd[1] = 0; fd[2] = 0;
59  this->stdout = o.stdout;
60  this->loggerinst = o.loggerinst;
61  }
62 
63  ~dmTask();
64  int key;
65 
66  std::string cmd;
67  const char *parms[64];
68 
70 
71  time_t starttime, endtime;
72  bool finished;
73 
74  int fd[3];
75  pid_t pid;
76  std::string stdout;
77 
78  /// Split che command string into the single parms
79  void splitCmd();
80 
81  /// Wait until the task has finished or the timeout is expired
82  int waitFinished(int tmout=5);
83 
84  void notifyAll() {
85  condvar.notify_all();
86  }
87 
89  };
90 
91 
92  /// Allows to spawn commands, useful for checksum calculations or file pulling
93  /// The spawned commands are pollable, i.e. in a given moment it's possible to
94  /// know the list of commands that are still running.
95  /// Objects belonging to this class in general are created in the disk nodes,
96  /// e.g. for running checksums or file copies and pulls
97  class dmTaskExec: public boost::recursive_mutex {
98 
99  public:
100  dmTaskExec();
101  ~dmTaskExec();
102  std::string instance;
103  /// Executes a command. Returns a positive integer as a key to reference
104  /// the execution status and the result
105  /// The mechanics is that a detached thread is started. This guy invokes popen3
106  /// and blocks waiting for the process to end. Upon end it updates the corresponding
107  /// instance of dmTask with the result and the stdout
108  int submitCmd(std::string cmd);
109 
110 
111  /// Executes a command. Returns a positive integer as a key to reference
112  // the execution status and the result
113  // The mechanics is that a detached thread is started. This guy invokes popen3
114  // and blocks waiting for the process to end. Upon end it updates the corresponding
115  // instance of dmTask with the result and the stdout
116  // -1 is returned in case of error in the submission
117  int submitCmd(std::vector<std::string> &args);
118 
119  /// Actually starts the thread corresponding to a command that was just submitted
120  /// Avoids race conditions
121  void goCmd(int id);
122 
123  /// Split che command string into the single parms
124  void assignCmd(dmTask *task, std::vector<std::string> &args);
125 
126  /// Get the results of a task.
127  /// Wait at max tmout seconds until the task finishes
128  /// Return 0 if the task has finished and there is a result
129  /// Return nonzero if the task is still running
130  int waitResult(int taskID, int tmout=5);
131 
132  //kill a specific task given the id
133  int killTask(int taskID);
134 
135  //get a dmTask given the id ( mainly for testing)
136  dmTask* getTask(int taskID);
137 
138  //get the current stdout of a task which may be running
139  int getTaskStdout(int taskID, std::string &stdout);
140 
141  /// Loops over all the tasks and:
142  /// - send a notification to the head node about all the processes that are running or that have finished
143  /// - garbage collect the task list.
144  /// - Task that are finished since long (e.g. 1 hour)
145  /// - Tasks that are stuck (e.g. 1 day)
146  void tick();
147 
148  int getTaskCounters(int &tot, int &running);
149 
150 
151  /// Event invoked internally to log stuff
152  virtual void onLoggingRequest(Logger::Level lvl, std::string const & msg) = 0;
153  /// Event invoked internally to log stuff
154  virtual void onErrLoggingRequest(std::string const & msg) = 0;
155 
156  protected:
157 
158  /// event for immediate notifications when a task finishes
159  /// Subclasses can specialize this and apply app-dependent behavior to
160  /// perform actions when something has finished running
161  /// NOTE the signature. This passes copies of Task objects, not the originals
162  virtual void onTaskCompleted(dmTask &task);
163 
164  // event that notifies that a task is running
165  // This event can be invoked multiple times during the life of a task
166  /// NOTE the signature. This passes copies of Task objects, not the originals
167  virtual void onTaskRunning(dmTask &task);
168 
169 
170  private:
171 
172  int popen3(int fd[3], pid_t *pid, const char ** argv );
173 
174  /// Used to create keys to be inserted into the map. This has to be treated modulo MAXINT or similar big number
175  int taskcnt;
176  /// This map works like a sparse array :-)
177  std::map<int, dmTask*> tasks;
178 
179 
180  /// Here we invoke popen3
181  /// and block waiting for the process to end. Upon end it updates the corresponding
182  /// instance of dmTask with the result and the stdout
183  virtual void run(int key);
184 
185  //kill a specific task
186  int killTask(dmTask *task);
187  };
188 
189 
190 
191 }
192 
193 
194 
195 
196 
197 
198 
199 
200 
201 
202 
203 
204 
205 #endif
206 
std::map< int, dmTask * > tasks
This map works like a sparse array :-)
Definition: TaskExec.h:177
virtual void run(int key)
void splitCmd()
Split che command string into the single parms.
int key
Definition: TaskExec.h:64
Definition: TaskExec.h:40
dmTaskExec * loggerinst
Definition: TaskExec.h:88
Definition: TaskExec.h:97
virtual void onLoggingRequest(Logger::Level lvl, std::string const &msg)=0
Event invoked internally to log stuff.
int popen3(int fd[3], pid_t *pid, const char **argv)
int waitResult(int taskID, int tmout=5)
const char * parms[64]
Definition: TaskExec.h:67
bool finished
Definition: TaskExec.h:72
dmTask(dmTaskExec *wheretolog)
int waitFinished(int tmout=5)
Wait until the task has finished or the timeout is expired.
void goCmd(int id)
void assignCmd(dmTask *task, std::vector< std::string > &args)
Split che command string into the single parms.
int submitCmd(std::string cmd)
time_t starttime
Definition: TaskExec.h:71
int fd[3]
Definition: TaskExec.h:74
int taskcnt
Used to create keys to be inserted into the map. This has to be treated modulo MAXINT or similar big ...
Definition: TaskExec.h:175
int getTaskCounters(int &tot, int &running)
void notifyAll()
Definition: TaskExec.h:84
boost::condition_variable condvar
Definition: TaskExec.h:47
std::string stdout
Definition: TaskExec.h:76
std::string instance
Definition: TaskExec.h:102
std::string cmd
Definition: TaskExec.h:66
int getTaskStdout(int taskID, std::string &stdout)
time_t endtime
Definition: TaskExec.h:71
pid_t pid
Definition: TaskExec.h:75
virtual void onTaskCompleted(dmTask &task)
int resultcode
Definition: TaskExec.h:69
virtual void onTaskRunning(dmTask &task)
NOTE the signature. This passes copies of Task objects, not the originals.
dmTask(const dmTask &o)
Definition: TaskExec.h:50
int killTask(int taskID)
Level
Definition: logger.h:50
Namespace for the dmlite C++ API.
Definition: authn.h:15
virtual void onErrLoggingRequest(std::string const &msg)=0
Event invoked internally to log stuff.
dmTask * getTask(int taskID)