Fawkes API  Fawkes Development Version
robot_memory.cpp
1 /***************************************************************************
2  * robot_memory.cpp - Class for storing and querying information in the RobotMemory
3  *
4  * Created: Aug 23, 2016 1:34:32 PM 2016
5  * Copyright 2016 Frederik Zwilling
6  * 2017 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  * GNU Library General Public License for more details.
18  *
19  * Read the full text in the LICENSE.GPL file in the doc directory.
20  */
21 
22 #include "robot_memory.h"
23 
24 #include <core/threading/mutex.h>
25 #include <core/threading/mutex_locker.h>
26 #include <interfaces/RobotMemoryInterface.h>
27 #include <plugins/mongodb/utils.h>
28 #include <utils/misc/string_conversions.h>
29 #include <utils/misc/string_split.h>
30 #include <utils/system/hostinfo.h>
31 #ifdef USE_TIMETRACKER
32 # include <utils/time/tracker.h>
33 #endif
34 #include <utils/time/tracker_macros.h>
35 
36 #include <bsoncxx/builder/basic/document.hpp>
37 #include <chrono>
38 #include <mongocxx/client.hpp>
39 #include <mongocxx/exception/operation_exception.hpp>
40 #include <mongocxx/read_preference.hpp>
41 #include <string>
42 #include <thread>
43 
44 using namespace fawkes;
45 using namespace mongocxx;
46 using namespace bsoncxx;
47 
48 /** @class RobotMemory "robot_memory.h"
49  * Access to the robot memory based on mongodb.
50  * Using this class, you can query/insert/remove/update information in
51  * the robot memory. Furthermore, you can register trigger to get
52  * notified when something was changed in the robot memory matching
53  * your query and you can access computables, which are on demand
54  * computed information, by registering the computables and then
55  * querying as if the information would already be in the database.
56  * @author Frederik Zwilling
57  */
58 
59 /**
60  * Robot Memory Constructor with objects of the thread
61  * @param config Fawkes config
62  * @param logger Fawkes logger
63  * @param clock Fawkes clock
64  * @param mongo_connection_manager MongoDBConnCreator to create client connections to the shared and local db
65  * @param blackboard Fawkes blackboard
66  */
68  fawkes::Logger * logger,
69  fawkes::Clock * clock,
70  fawkes::MongoDBConnCreator *mongo_connection_manager,
71  fawkes::BlackBoard * blackboard)
72 {
73  config_ = config;
74  logger_ = logger;
75  clock_ = clock;
76  mongo_connection_manager_ = mongo_connection_manager;
77  blackboard_ = blackboard;
78  mongodb_client_local_ = nullptr;
79  mongodb_client_distributed_ = nullptr;
80  debug_ = false;
81 }
82 
83 RobotMemory::~RobotMemory()
84 {
85  mongo_connection_manager_->delete_client(mongodb_client_local_);
86  mongo_connection_manager_->delete_client(mongodb_client_distributed_);
87  delete trigger_manager_;
88  blackboard_->close(rm_if_);
89 #ifdef USE_TIMETRACKER
90  delete tt_;
91 #endif
92 }
93 
94 void
95 RobotMemory::init()
96 {
97  //load config values
98  log("Started RobotMemory");
99  default_collection_ = "robmem.test";
100  try {
101  default_collection_ = config_->get_string("/plugins/robot-memory/default-collection");
102  } catch (Exception &) {
103  }
104  try {
105  debug_ = config_->get_bool("/plugins/robot-memory/more-debug-output");
106  } catch (Exception &) {
107  }
108  database_name_ = "robmem";
109  try {
110  database_name_ = config_->get_string("/plugins/robot-memory/database");
111  } catch (Exception &) {
112  }
113  distributed_dbs_ = config_->get_strings("/plugins/robot-memory/distributed-db-names");
114 
115  cfg_coord_database_ = config_->get_string("/plugins/robot-memory/coordination/database");
116  cfg_coord_mutex_collection_ =
117  config_->get_string("/plugins/robot-memory/coordination/mutex-collection");
118 
119  using namespace std::chrono_literals;
120 
121  //initiate mongodb connections:
122  log("Connect to local mongod");
123  mongodb_client_local_ = mongo_connection_manager_->create_client("robot-memory-local");
124 
125  if (config_->exists("/plugins/mongodb/clients/robot-memory-distributed/enabled")
126  && config_->get_bool("/plugins/mongodb/clients/robot-memory-distributed/enabled")) {
127  distributed_ = true;
128  log("Connect to distributed mongod");
129  mongodb_client_distributed_ =
130  mongo_connection_manager_->create_client("robot-memory-distributed");
131  }
132 
133  //init blackboard interface
134  rm_if_ = blackboard_->open_for_writing<RobotMemoryInterface>(
135  config_->get_string("/plugins/robot-memory/interface-name").c_str());
136  rm_if_->set_error("");
137  rm_if_->set_result("");
138  rm_if_->write();
139 
140  //Setup event trigger and computables manager
141  trigger_manager_ = new EventTriggerManager(logger_, config_, mongo_connection_manager_);
142  computables_manager_ = new ComputablesManager(config_, this);
143 
144  log_deb("Initialized RobotMemory");
145 
146 #ifdef USE_TIMETRACKER
147  tt_ = new TimeTracker();
148  tt_loopcount_ = 0;
149  ttc_events_ = tt_->add_class("RobotMemory Events");
150  ttc_cleanup_ = tt_->add_class("RobotMemory Cleanup");
151 #endif
152 }
153 
154 void
155 RobotMemory::loop()
156 {
157  TIMETRACK_START(ttc_events_);
158  trigger_manager_->check_events();
159  TIMETRACK_END(ttc_events_);
160  TIMETRACK_START(ttc_cleanup_);
161  computables_manager_->cleanup_computed_docs();
162  TIMETRACK_END(ttc_cleanup_);
163 #ifdef USE_TIMETRACKER
164  if (++tt_loopcount_ % 5 == 0) {
165  tt_->print_to_stdout();
166  }
167 #endif
168 }
169 
170 /**
171  * Query information from the robot memory.
172  * @param query The query returned documents have to match (essentially a BSONObj)
173  * @param collection_name The database and collection to query as string (e.g. robmem.worldmodel)
174  * @param query_options Optional options to use to query the database
175  * @return Cursor to get the documents from, NULL for invalid query
176  */
177 cursor
178 RobotMemory::query(document::view query,
179  const std::string & collection_name,
180  mongocxx::options::find query_options)
181 {
182  collection collection = get_collection(collection_name);
183  log_deb(std::string("Executing Query " + to_json(query) + " on collection " + collection_name));
184 
185  //check if computation on demand is necessary and execute Computables
186  computables_manager_->check_and_compute(query, collection_name);
187 
188  //lock (mongo_client not thread safe)
189  MutexLocker lock(mutex_);
190 
191  //actually execute query
192  try {
193  return collection.find(query, query_options);
194  } catch (mongocxx::operation_exception &e) {
195  std::string error =
196  std::string("Error for query ") + to_json(query) + "\n Exception: " + e.what();
197  log(error, "error");
198  throw;
199  }
200 }
201 
202 /**
203  * Inserts a document into the robot memory
204  * @param doc A view of the document to insert
205  * @param collection_name The database and collection to use as string (e.g. robmem.worldmodel)
206  * @return 1: Success 0: Error
207  */
208 int
209 RobotMemory::insert(bsoncxx::document::view doc, const std::string &collection_name)
210 {
211  collection collection = get_collection(collection_name);
212  log_deb(std::string("Inserting " + to_json(doc) + " into collection " + collection_name));
213  //lock (mongo_client not thread safe)
214  MutexLocker lock(mutex_);
215  //actually execute insert
216  try {
217  collection.insert_one(doc);
218  } catch (mongocxx::operation_exception &e) {
219  std::string error = "Error for insert " + to_json(doc) + "\n Exception: " + e.what();
220  log_deb(error, "error");
221  return 0;
222  }
223  //return success
224  return 1;
225 }
226 
227 /** Create an index on a collection.
228  * @param keys The keys document
229  * @param collection_name The database and collection to use as string (e.g. robmem.worldmodel)
230  * @param unique true to create unique index
231  * @return 1: Success 0: Error
232  */
233 int
234 RobotMemory::create_index(bsoncxx::document::view keys,
235  const std::string & collection_name,
236  bool unique)
237 {
238  collection collection = get_collection(collection_name);
239 
240  log_deb(std::string("Creating index " + to_json(keys) + " on collection " + collection_name));
241 
242  //lock (mongo_client not thread safe)
243  MutexLocker lock(mutex_);
244 
245  //actually execute insert
246  try {
247  using namespace bsoncxx::builder::basic;
248  collection.create_index(keys, make_document(kvp("unique", unique)));
249  } catch (operation_exception &e) {
250  std::string error = "Error when creating index " + to_json(keys) + "\n Exception: " + e.what();
251  log_deb(error, "error");
252  return 0;
253  }
254  //return success
255  return 1;
256 }
257 
258 /**
259  * Inserts all document of a vector into the robot memory
260  * @param docs The vector of BSON documents as views
261  * @param collection_name The database and collection to use as string (e.g. robmem.worldmodel)
262  * @return 1: Success 0: Error
263  */
264 int
265 RobotMemory::insert(std::vector<bsoncxx::document::view> docs, const std::string &collection_name)
266 {
267  collection collection = get_collection(collection_name);
268  std::string insert_string = "[";
269  for (auto &&doc : docs) {
270  insert_string += to_json(doc) + ",\n";
271  }
272  insert_string += "]";
273 
274  log_deb(std::string("Inserting vector of documents " + insert_string + " into collection "
275  + collection_name));
276 
277  //lock (mongo_client not thread safe)
278  MutexLocker lock(mutex_);
279 
280  //actually execute insert
281  try {
282  collection.insert_many(docs);
283  } catch (operation_exception &e) {
284  std::string error = "Error for insert " + insert_string + "\n Exception: " + e.what();
285  log_deb(error, "error");
286  return 0;
287  }
288  //return success
289  return 1;
290 }
291 
292 /**
293  * Inserts a document into the robot memory
294  * @param obj_str The document as json string
295  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
296  * @return 1: Success 0: Error
297  */
298 int
299 RobotMemory::insert(const std::string &obj_str, const std::string &collection)
300 {
301  return insert(from_json(obj_str), collection);
302 }
303 
304 /**
305  * Updates documents in the robot memory
306  * @param query The query defining which documents to update
307  * @param update What to change in these documents
308  * @param collection_name The database and collection to use as string (e.g. robmem.worldmodel)
309  * @param upsert Should the update document be inserted if the query returns no documents?
310  * @return 1: Success 0: Error
311  */
312 int
313 RobotMemory::update(const bsoncxx::document::view &query,
314  const bsoncxx::document::view &update,
315  const std::string & collection_name,
316  bool upsert)
317 {
318  collection collection = get_collection(collection_name);
319  log_deb(std::string("Executing Update " + to_json(update) + " for query " + to_json(query)
320  + " on collection " + collection_name));
321 
322  //lock (mongo_client not thread safe)
323  MutexLocker lock(mutex_);
324 
325  //actually execute update
326  try {
327  collection.update_many(query,
328  builder::basic::make_document(
329  builder::basic::kvp("$set", builder::concatenate(update))),
330  options::update().upsert(upsert));
331  } catch (operation_exception &e) {
332  log_deb(std::string("Error for update " + to_json(update) + " for query " + to_json(query)
333  + "\n Exception: " + e.what()),
334  "error");
335  return 0;
336  }
337  //return success
338  return 1;
339 }
340 
341 /**
342  * Updates documents in the robot memory
343  * @param query The query defining which documents to update
344  * @param update_str What to change in these documents as json string
345  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
346  * @param upsert Should the update document be inserted if the query returns no documents?
347  * @return 1: Success 0: Error
348  */
349 int
350 RobotMemory::update(const bsoncxx::document::view &query,
351  const std::string & update_str,
352  const std::string & collection,
353  bool upsert)
354 {
355  return update(query, from_json(update_str), collection, upsert);
356 }
357 
358 /** Atomically update and retrieve document.
359  * @param filter The filter defining the document to update.
360  * If multiple match takes the first one.
361  * @param update Update statement. May only contain update operators!
362  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
363  * @param upsert Should the update document be inserted if the query returns no documents?
364  * @param return_new return the document before (false) or after (true) the update has been applied?
365  * @return document, depending on @p return_new either before or after the udpate has been applied.
366  */
367 document::value
368 RobotMemory::find_one_and_update(const document::view &filter,
369  const document::view &update,
370  const std::string & collection_name,
371  bool upsert,
372  bool return_new)
373 {
374  collection collection = get_collection(collection_name);
375 
376  log_deb(std::string("Executing findOneAndUpdate " + to_json(update) + " for filter "
377  + to_json(filter) + " on collection " + collection_name));
378 
379  MutexLocker lock(mutex_);
380 
381  try {
382  auto res =
383  collection.find_one_and_update(filter,
384  update,
385  options::find_one_and_update().upsert(upsert).return_document(
386  return_new ? options::return_document::k_after
387  : options::return_document::k_before));
388  if (res) {
389  return *res;
390  } else {
391  std::string error = "Error for update " + to_json(update) + " for query " + to_json(filter)
392  + "FindOneAndUpdate unexpectedly did not return a document";
393  log_deb(error, "warn");
394  return bsoncxx::builder::basic::make_document(bsoncxx::builder::basic::kvp("error", error));
395  }
396  } catch (operation_exception &e) {
397  std::string error = "Error for update " + to_json(update) + " for query " + to_json(filter)
398  + "\n Exception: " + e.what();
399  log_deb(error, "error");
400  return bsoncxx::builder::basic::make_document(bsoncxx::builder::basic::kvp("error", error));
401  }
402 }
403 
404 /**
405  * Remove documents from the robot memory
406  * @param query Which documents to remove
407  * @param collection_name The database and collection to use as string (e.g. robmem.worldmodel)
408  * @return 1: Success 0: Error
409  */
410 int
411 RobotMemory::remove(const bsoncxx::document::view &query, const std::string &collection_name)
412 {
413  //lock (mongo_client not thread safe)
414  MutexLocker lock(mutex_);
415  collection collection = get_collection(collection_name);
416  log_deb(std::string("Executing Remove " + to_json(query) + " on collection " + collection_name));
417  //actually execute remove
418  try {
419  collection.delete_many(query);
420  } catch (operation_exception &e) {
421  log_deb(std::string("Error for query " + to_json(query) + "\n Exception: " + e.what()),
422  "error");
423  return 0;
424  }
425  //return success
426  return 1;
427 }
428 
429 /**
430  * Performs a MapReduce operation on the robot memory (https://docs.mongodb.com/manual/core/map-reduce/)
431  * @param query Which documents to use for the map step
432  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
433  * @param js_map_fun Map function in JavaScript as string
434  * @param js_reduce_fun Reduce function in JavaScript as string
435  * @return BSONObj containing the result
436  */
437 bsoncxx::document::value
438 RobotMemory::mapreduce(const bsoncxx::document::view &query,
439  const std::string & collection,
440  const std::string & js_map_fun,
441  const std::string & js_reduce_fun)
442 {
443  throw Exception("Not implemented");
444  /*
445  mongo::DBClientBase *mongodb_client = get_mongodb_client(collection);
446  MutexLocker lock(mutex_);
447  log_deb(std::string("Executing MapReduce " + query.toString() + " on collection " + collection
448  + " map: " + js_map_fun + " reduce: " + js_reduce_fun));
449  return mongodb_client->mapreduce(collection, js_map_fun, js_reduce_fun, query);
450  */
451 }
452 
453 /**
454  * Performs an aggregation operation on the robot memory (https://docs.mongodb.com/v3.2/reference/method/db.collection.aggregate/)
455  * @param pipeline A sequence of data aggregation operations or stages. See the https://docs.mongodb.com/v3.2/reference/operator/aggregation-pipeline/ for details
456  * @param collection_name The database and collection to use as string (e.g. robmem.worldmodel)
457  * @return Cursor to get the documents from, NULL for invalid pipeline
458  */
459 cursor
460 RobotMemory::aggregate(mongocxx::pipeline &pipeline, const std::string &collection_name)
461 {
462  collection collection = get_collection(collection_name);
463  log_deb(std::string("Aggregating in collection " + collection_name));
464  //lock (mongo_client not thread safe)
465  MutexLocker lock(mutex_);
466  //actually execute aggregate
467  try {
468  return collection.aggregate(pipeline, mongocxx::options::aggregate{});
469  } catch (operation_exception &e) {
470  std::string error =
471  std::string("Error when aggregating " + to_json(pipeline.view_array()) + "\n Exception: ")
472  + e.what();
473  log_deb(error, "error");
474  throw;
475  }
476 }
477 
478 /**
479  * Drop (= remove) a whole collection and all documents inside it
480  * @param collection_name The database and collection to use as string (e.g. robmem.worldmodel)
481  * @return 1: Success 0: Error
482  */
483 int
484 RobotMemory::drop_collection(const std::string &collection_name)
485 {
486  MutexLocker lock(mutex_);
487  collection collection = get_collection(collection_name);
488  log_deb("Dropping collection " + collection_name);
489  collection.drop();
490  return 1;
491 }
492 
493 /**
494  * Remove the whole database of the robot memory and all documents inside
495  * @return 1: Success 0: Error
496  */
497 int
499 {
500  //lock (mongo_client not thread safe)
501  MutexLocker lock(mutex_);
502 
503  log_deb("Clearing whole robot memory");
504  mongodb_client_local_->database(database_name_).drop();
505  return 1;
506 }
507 
508 /**
509  * Restore a previously dumped collection from a directory
510  * @param dbcollection The database and collection to use as string (e.g.
511  * robmem.worldmodel)
512  * @param directory Directory of the dump
513  * @param target_dbcollection Optional different database and collection where
514  * the dump is restored to. If not set, the dump will be restored in the
515  * previous place
516  * @return 1: Success 0: Error
517  */
518 int
519 RobotMemory::restore_collection(const std::string &dbcollection,
520  const std::string &directory,
521  std::string target_dbcollection)
522 {
523  if (target_dbcollection == "") {
524  target_dbcollection = dbcollection;
525  }
526 
527  drop_collection(target_dbcollection);
528 
529  //lock (mongo_client not thread safe)
530  MutexLocker lock(mutex_);
531 
532  auto [db, collection] = split_db_collection_string(dbcollection);
533  std::string path =
534  StringConversions::resolve_path(directory) + "/" + db + "/" + collection + ".bson";
535  log_deb(std::string("Restore collection " + collection + " from " + path), "warn");
536 
537  auto [target_db, target_collection] = split_db_collection_string(target_dbcollection);
538 
539  //call mongorestore from folder with initial restores
540  std::string command = "/usr/bin/mongorestore --dir " + path + " -d " + target_db + " -c "
541  + target_collection + " --host=" + get_hostport(dbcollection);
542  log_deb(std::string("Restore command: " + command), "warn");
543  FILE *bash_output = popen(command.c_str(), "r");
544 
545  //check if output is ok
546  if (!bash_output) {
547  log(std::string("Unable to restore collection" + collection), "error");
548  return 0;
549  }
550  std::string output_string = "";
551  char buffer[100];
552  while (!feof(bash_output)) {
553  if (fgets(buffer, 100, bash_output) == NULL) {
554  break;
555  }
556  output_string += buffer;
557  }
558  pclose(bash_output);
559  if (output_string.find("Failed") != std::string::npos) {
560  log(std::string("Unable to restore collection" + collection), "error");
561  log_deb(output_string, "error");
562  return 0;
563  }
564  return 1;
565 }
566 
567 /**
568  * Dump (= save) a collection to the filesystem to restore it later
569  * @param dbcollection The database and collection to use as string (e.g. robmem.worldmodel)
570  * @param directory Directory to dump the collection to
571  * @return 1: Success 0: Error
572  */
573 int
574 RobotMemory::dump_collection(const std::string &dbcollection, const std::string &directory)
575 {
576  //lock (mongo_client not thread safe)
577  MutexLocker lock(mutex_);
578 
579  std::string path = StringConversions::resolve_path(directory);
580  log_deb(std::string("Dump collection " + dbcollection + " into " + path), "warn");
581 
582  auto [db, collection] = split_db_collection_string(dbcollection);
583 
584  std::string command = "/usr/bin/mongodump --out=" + path + " --db=" + db
585  + " --collection=" + collection + " --forceTableScan"
586  + " --host=" + get_hostport(dbcollection);
587  log(std::string("Dump command: " + command), "info");
588  FILE *bash_output = popen(command.c_str(), "r");
589  //check if output is ok
590  if (!bash_output) {
591  log(std::string("Unable to dump collection" + collection), "error");
592  return 0;
593  }
594  std::string output_string = "";
595  char buffer[100];
596  while (!feof(bash_output)) {
597  if (fgets(buffer, 100, bash_output) == NULL) {
598  break;
599  }
600  output_string += buffer;
601  }
602  pclose(bash_output);
603  if (output_string.find("Failed") != std::string::npos) {
604  log(std::string("Unable to dump collection" + collection), "error");
605  log_deb(output_string, "error");
606  return 0;
607  }
608  return 1;
609 }
610 
611 void
612 RobotMemory::log(const std::string &what, const std::string &info)
613 {
614  if (!info.compare("error"))
615  logger_->log_error(name_, "%s", what.c_str());
616  else if (!info.compare("warn"))
617  logger_->log_warn(name_, "%s", what.c_str());
618  else if (!info.compare("debug"))
619  logger_->log_debug(name_, "%s", what.c_str());
620  else
621  logger_->log_info(name_, "%s", what.c_str());
622 }
623 
624 void
625 RobotMemory::log_deb(const std::string &what, const std::string &level)
626 {
627  if (debug_) {
628  log(what, level);
629  }
630 }
631 
632 void
633 RobotMemory::log_deb(const bsoncxx::document::view &query,
634  const std::string & what,
635  const std::string & level)
636 {
637  if (debug_) {
638  log(query, what, level);
639  }
640 }
641 
642 void
643 RobotMemory::log(const bsoncxx::document::view &query,
644  const std::string & what,
645  const std::string & level)
646 {
647  log(what + " " + to_json(query), level);
648 }
649 
650 /** Check if the given database is a distributed database
651  * @param dbcollection A database collection name pair of the form <dbname>.<collname>
652  * @return true iff the database is distributed database
653  */
654 bool
655 RobotMemory::is_distributed_database(const std::string &dbcollection)
656 {
657  return std::find(distributed_dbs_.begin(),
658  distributed_dbs_.end(),
659  split_db_collection_string(dbcollection).first)
660  != distributed_dbs_.end();
661 }
662 
663 std::string
664 RobotMemory::get_hostport(const std::string &dbcollection)
665 {
666  if (distributed_ && is_distributed_database(dbcollection)) {
667  return config_->get_string("/plugins/mongodb/clients/robot-memory-distributed-direct/hostport");
668  } else {
669  return config_->get_string("/plugins/mongodb/clients/robot-memory-local-direct/hostport");
670  }
671 }
672 
673 /**
674  * Get the mongodb client associated with the collection (eighter the local or distributed one)
675  * @param collection The collection name in the form "<dbname>.<collname>"
676  * @return A pointer to the client for the database with name <dbname>
677  */
678 client *
679 RobotMemory::get_mongodb_client(const std::string &collection)
680 {
681  if (!distributed_) {
682  return mongodb_client_local_;
683  }
684  if (is_distributed_database(collection)) {
685  return mongodb_client_distributed_;
686  } else {
687  return mongodb_client_local_;
688  }
689 }
690 
691 /**
692  * Get the collection object referred to by the given string.
693  * @param dbcollection The name of the collection in the form <dbname>.<collname>
694  * @return The respective collection object
695  */
696 
697 collection
698 RobotMemory::get_collection(const std::string &dbcollection)
699 {
700  auto db_coll_pair = split_db_collection_string(dbcollection);
701  client *client;
702  if (is_distributed_database(dbcollection)) {
703  client = mongodb_client_distributed_;
704  } else {
705  client = mongodb_client_local_;
706  }
707  return client->database(db_coll_pair.first)[db_coll_pair.second];
708 }
709 
710 /**
711  * Remove a previously registered trigger
712  * @param trigger Pointer to the trigger to remove
713  */
714 void
716 {
717  trigger_manager_->remove_trigger(trigger);
718 }
719 
720 /**
721  * Remove previously registered computable
722  * @param computable The computable to remove
723  */
724 void
726 {
727  computables_manager_->remove_computable(computable);
728 }
729 
730 /** Explicitly create a mutex.
731  * This is an optional step, a mutex is also created automatically when trying
732  * to acquire the lock for the first time. Adding it explicitly may increase
733  * visibility, e.g., in the database. Use it for mutexes which are locked
734  * only very infrequently.
735  * @param name mutex name
736  * @return true if operation was successful, false on failure
737  */
738 bool
739 RobotMemory::mutex_create(const std::string &name)
740 {
741  client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
742  using namespace bsoncxx::builder;
743  basic::document insert_doc{};
744  insert_doc.append(basic::kvp("$currentDate", [](basic::sub_document subdoc) {
745  subdoc.append(basic::kvp("lock-time", true));
746  }));
747  insert_doc.append(basic::kvp("_id", name));
748  insert_doc.append(basic::kvp("locked", false));
749  try {
750  MutexLocker lock(mutex_);
751  collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
752  auto write_concern = mongocxx::write_concern();
753  write_concern.majority(std::chrono::milliseconds(0));
754  collection.insert_one(insert_doc.view(), options::insert().write_concern(write_concern));
755  return true;
756  } catch (operation_exception &e) {
757  logger_->log_info(name_, "Failed to create mutex %s: %s", name.c_str(), e.what());
758  return false;
759  }
760 }
761 
762 /** Destroy a mutex.
763  * The mutex is erased from the database. This is done disregarding it's current
764  * lock state.
765  * @param name mutex name
766  * @return true if operation was successful, false on failure
767  */
768 bool
769 RobotMemory::mutex_destroy(const std::string &name)
770 {
771  client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
772  using namespace bsoncxx::builder;
773  basic::document destroy_doc;
774  destroy_doc.append(basic::kvp("_id", name));
775  try {
776  MutexLocker lock(mutex_);
777  collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
778  auto write_concern = mongocxx::write_concern();
779  write_concern.majority(std::chrono::milliseconds(0));
780  collection.delete_one(destroy_doc.view(),
781  options::delete_options().write_concern(write_concern));
782  return true;
783  } catch (operation_exception &e) {
784  logger_->log_info(name_, "Failed to destroy mutex %s: %s", name.c_str(), e.what());
785  return false;
786  }
787 }
788 
789 /** Try to acquire a lock for a mutex.
790  * This will access the database and atomically find and update (or
791  * insert) a mutex lock. If the mutex has not been created it is added
792  * automatically. If the lock cannot be acquired the function also
793  * returns immediately. There is no blocked waiting for the lock.
794  * @param name mutex name
795  * @param identity string to set as lock-holder
796  * @param force true to force acquisition of the lock, i.e., even if
797  * the lock has already been acquired take ownership (steal the lock).
798  * @return true if operation was successful, false on failure
799  */
800 bool
801 RobotMemory::mutex_try_lock(const std::string &name, const std::string &identity, bool force)
802 {
803  client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
804 
805  std::string locked_by{identity};
806  if (identity.empty()) {
807  HostInfo host_info;
808  locked_by = host_info.name();
809  }
810 
811  // here we can add an $or to implement lock timeouts
812  using namespace bsoncxx::builder;
813  basic::document filter_doc;
814  filter_doc.append(basic::kvp("_id", name));
815  if (!force) {
816  filter_doc.append(basic::kvp("locked", false));
817  }
818 
819  basic::document update_doc;
820  update_doc.append(basic::kvp("$currentDate", [](basic::sub_document subdoc) {
821  subdoc.append(basic::kvp("lock-time", true));
822  }));
823  update_doc.append(basic::kvp("$set", [locked_by](basic::sub_document subdoc) {
824  subdoc.append(basic::kvp("locked", true));
825  subdoc.append(basic::kvp("locked-by", locked_by));
826  }));
827  try {
828  MutexLocker lock(mutex_);
829  collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
830  auto write_concern = mongocxx::write_concern();
831  write_concern.majority(std::chrono::milliseconds(0));
832  auto new_doc =
833  collection.find_one_and_update(filter_doc.view(),
834  update_doc.view(),
835  options::find_one_and_update()
836  .upsert(true)
837  .return_document(options::return_document::k_after)
838  .write_concern(write_concern));
839 
840  if (!new_doc) {
841  return false;
842  }
843  auto new_view = new_doc->view();
844  return (new_view["locked-by"].get_utf8().value.to_string() == locked_by
845  && new_view["locked"].get_bool());
846 
847  } catch (operation_exception &e) {
848  logger_->log_error(name_, "Mongo OperationException: %s", e.what());
849  try {
850  // TODO is this extrac check still needed?
851  basic::document check_doc;
852  check_doc.append(basic::kvp("_id", name));
853  check_doc.append(basic::kvp("locked", true));
854  check_doc.append(basic::kvp("locked-by", locked_by));
855  MutexLocker lock(mutex_);
856  collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
857  auto res = collection.find_one(check_doc.view());
858  logger_->log_info(name_, "Checking whether mutex was acquired succeeded");
859  if (res) {
860  logger_->log_warn(name_,
861  "Exception during try-lock for %s, "
862  "but mutex was still acquired",
863  name.c_str());
864  } else {
865  logger_->log_info(name_,
866  "Exception during try-lock for %s, "
867  "and mutex was not acquired",
868  name.c_str());
869  }
870  return static_cast<bool>(res);
871  } catch (operation_exception &e) {
872  logger_->log_error(name_,
873  "Mongo OperationException while handling "
874  "the first exception: %s",
875  e.what());
876  return false;
877  }
878  }
879 }
880 
881 /** Try to acquire a lock for a mutex.
882  * This will access the database and atomically find and update (or
883  * insert) a mutex lock. If the mutex has not been created it is added
884  * automatically. If the lock cannot be acquired the function also
885  * returns immediately. There is no blocked waiting for the lock.
886  * @param name mutex name
887  * @param force true to force acquisition of the lock, i.e., even if
888  * the lock has already been acquired take ownership (steal the lock).
889  * @return true if operation was successful, false on failure
890  */
891 bool
892 RobotMemory::mutex_try_lock(const std::string &name, bool force)
893 {
894  return mutex_try_lock(name, "", force);
895 }
896 
897 /** Release lock on mutex.
898  * @param name mutex name
899  * @param identity string to set as lock-holder
900  * @return true if operation was successful, false on failure
901  */
902 bool
903 RobotMemory::mutex_unlock(const std::string &name, const std::string &identity)
904 {
905  client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
906 
907  std::string locked_by{identity};
908  if (identity.empty()) {
909  HostInfo host_info;
910  locked_by = host_info.name();
911  }
912 
913  using namespace bsoncxx::builder;
914  // here we can add an $or to implement lock timeouts
915  basic::document filter_doc;
916  filter_doc.append(basic::kvp("_id", name));
917  filter_doc.append(basic::kvp("locked-by", locked_by));
918 
919  basic::document update_doc;
920  update_doc.append(basic::kvp("$set", [](basic::sub_document subdoc) {
921  subdoc.append(basic::kvp("locked", false));
922  }));
923  update_doc.append(basic::kvp("$unset", [](basic::sub_document subdoc) {
924  subdoc.append(basic::kvp("locked-by", true));
925  subdoc.append(basic::kvp("lock-time", true));
926  }));
927 
928  try {
929  MutexLocker lock(mutex_);
930  collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
931  auto write_concern = mongocxx::write_concern();
932  write_concern.majority(std::chrono::milliseconds(0));
933  auto new_doc =
934  collection.find_one_and_update(filter_doc.view(),
935  update_doc.view(),
936  options::find_one_and_update()
937  .upsert(true)
938  .return_document(options::return_document::k_after)
939  .write_concern(write_concern));
940  if (!new_doc) {
941  return false;
942  }
943  return new_doc->view()["locked"].get_bool();
944  } catch (operation_exception &e) {
945  return false;
946  }
947 }
948 
949 /** Renew a mutex.
950  * Renewing means updating the lock timestamp to the current time to
951  * avoid expiration. Note that the lock must currently be held by
952  * the given identity.
953  * @param name mutex name
954  * @param identity string to set as lock-holder (defaults to hostname
955  * if empty)
956  * @return true if operation was successful, false on failure
957  */
958 bool
959 RobotMemory::mutex_renew_lock(const std::string &name, const std::string &identity)
960 {
961  client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
962 
963  std::string locked_by{identity};
964  if (identity.empty()) {
965  HostInfo host_info;
966  locked_by = host_info.name();
967  }
968 
969  using namespace bsoncxx::builder;
970  // here we can add an $or to implement lock timeouts
971  basic::document filter_doc;
972  filter_doc.append(basic::kvp("_id", name));
973  filter_doc.append(basic::kvp("locked", true));
974  filter_doc.append(basic::kvp("locked-by", locked_by));
975 
976  // we set all data, even the data which is not actually modified, to
977  // make it easier to process the update in triggers.
978  basic::document update_doc;
979  update_doc.append(basic::kvp("$currentDate", [](basic::sub_document subdoc) {
980  subdoc.append(basic::kvp("lock-time", true));
981  }));
982  update_doc.append(basic::kvp("$set", [locked_by](basic::sub_document subdoc) {
983  subdoc.append(basic::kvp("locked", true));
984  subdoc.append(basic::kvp("locked-by", locked_by));
985  }));
986 
987  try {
988  MutexLocker lock(mutex_);
989  collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
990  auto write_concern = mongocxx::write_concern();
991  write_concern.majority(std::chrono::milliseconds(0));
992  auto new_doc =
993  collection.find_one_and_update(filter_doc.view(),
994  update_doc.view(),
995  options::find_one_and_update()
996  .upsert(false)
997  .return_document(options::return_document::k_after)
998  .write_concern(write_concern));
999  return static_cast<bool>(new_doc);
1000  } catch (operation_exception &e) {
1001  logger_->log_warn(name_, "Renewing lock on mutex %s failed: %s", name.c_str(), e.what());
1002  return false;
1003  }
1004 }
1005 
1006 /** Setup time-to-live index for mutexes.
1007  * Setting up a time-to-live index for mutexes enables automatic
1008  * expiration through the database. Note, however, that the documents
1009  * are expired only every 60 seconds. This has two consequences:
1010  * - max_age_sec lower than 60 seconds cannot be achieved
1011  * - locks may be held for up to just below 60 seconds longer than
1012  * configured, i.e., if the mutex had not yet expired when the
1013  * background tasks runs.
1014  * @param max_age_sec maximum age of locks in seconds
1015  * @return true if operation was successful, false on failure
1016  */
1017 bool
1019 {
1020  MutexLocker lock(mutex_);
1021 
1022  client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
1023 
1024  auto keys = builder::basic::make_document(builder::basic::kvp("lock-time", true));
1025 
1026  try {
1027  collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
1028  collection.create_index(keys.view(),
1029  builder::basic::make_document(
1030  builder::basic::kvp("expireAfterSeconds", max_age_sec)));
1031  } catch (operation_exception &e) {
1032  logger_->log_warn(name_, "Creating TTL index failed: %s", e.what());
1033  return false;
1034  }
1035  return true;
1036 }
1037 
1038 /** Expire old locks on mutexes.
1039  * This will update the database and set all mutexes to unlocked for
1040  * which the lock-time is older than the given maximum age.
1041  * @param max_age_sec maximum age of locks in seconds
1042  * @return true if operation was successful, false on failure
1043  */
1044 bool
1046 {
1047  client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
1048 
1049  using std::chrono::high_resolution_clock;
1050  using std::chrono::milliseconds;
1051  using std::chrono::time_point;
1052  using std::chrono::time_point_cast;
1053 
1054  auto max_age_ms = milliseconds(static_cast<unsigned long int>(std::floor(max_age_sec * 1000)));
1055  time_point<high_resolution_clock, milliseconds> expire_before =
1056  time_point_cast<milliseconds>(high_resolution_clock::now()) - max_age_ms;
1057  types::b_date expire_before_mdb(expire_before);
1058 
1059  // here we can add an $or to implement lock timeouts
1060  using namespace bsoncxx::builder;
1061  basic::document filter_doc;
1062  filter_doc.append(basic::kvp("locked", true));
1063  filter_doc.append(basic::kvp("lock-time", [expire_before_mdb](basic::sub_document subdoc) {
1064  subdoc.append(basic::kvp("$lt", expire_before_mdb));
1065  }));
1066 
1067  basic::document update_doc;
1068  update_doc.append(basic::kvp("$set", [](basic::sub_document subdoc) {
1069  subdoc.append(basic::kvp("locked", false));
1070  }));
1071  update_doc.append(basic::kvp("$unset", [](basic::sub_document subdoc) {
1072  subdoc.append(basic::kvp("locked-by", true));
1073  subdoc.append(basic::kvp("lock-time", true));
1074  }));
1075 
1076  try {
1077  MutexLocker lock(mutex_);
1078  collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
1079  auto write_concern = mongocxx::write_concern();
1080  write_concern.majority(std::chrono::milliseconds(0));
1081  collection.update_many(filter_doc.view(),
1082  update_doc.view(),
1083  options::update().write_concern(write_concern));
1084  return true;
1085  } catch (operation_exception &e) {
1086  log(std::string("Failed to expire locks: " + std::string(e.what())), "error");
1087  return false;
1088  }
1089 }
Class holding information for a single computable this class also enhances computed documents by addi...
Definition: computable.h:32
This class manages registering computables and can check if any computables are invoced by a query.
Manager to realize triggers on events in the robot memory.
Class holding all information about an EventTrigger.
Definition: event_trigger.h:32
bool mutex_create(const std::string &name)
Explicitly create a mutex.
int dump_collection(const std::string &dbcollection, const std::string &directory="@CONFDIR@/robot-memory")
Dump (= save) a collection to the filesystem to restore it later.
bool mutex_destroy(const std::string &name)
Destroy a mutex.
bool mutex_renew_lock(const std::string &name, const std::string &identity)
Renew a mutex.
bool mutex_unlock(const std::string &name, const std::string &identity)
Release lock on mutex.
void remove_trigger(EventTrigger *trigger)
Remove a previously registered trigger.
mongocxx::cursor query(bsoncxx::document::view query, const std::string &collection_name="", mongocxx::options::find query_options=mongocxx::options::find())
Query information from the robot memory.
bsoncxx::document::value find_one_and_update(const bsoncxx::document::view &filter, const bsoncxx::document::view &update, const std::string &collection, bool upsert=false, bool return_new=true)
Atomically update and retrieve document.
bool mutex_setup_ttl(float max_age_sec)
Setup time-to-live index for mutexes.
bsoncxx::document::value mapreduce(const bsoncxx::document::view &query, const std::string &collection, const std::string &js_map_fun, const std::string &js_reduce_fun)
Performs a MapReduce operation on the robot memory (https://docs.mongodb.com/manual/core/map-reduce/)
int remove(const bsoncxx::document::view &query, const std::string &collection="")
Remove documents from the robot memory.
int insert(bsoncxx::document::view, const std::string &collection="")
Inserts a document into the robot memory.
mongocxx::cursor aggregate(mongocxx::pipeline &pipeline, const std::string &collection="")
Performs an aggregation operation on the robot memory (https://docs.mongodb.com/v3....
void remove_computable(Computable *computable)
Remove previously registered computable.
int drop_collection(const std::string &collection)
Drop (= remove) a whole collection and all documents inside it.
int update(const bsoncxx::document::view &query, const bsoncxx::document::view &update, const std::string &collection="", bool upsert=false)
Updates documents in the robot memory.
bool mutex_try_lock(const std::string &name, bool force=false)
Try to acquire a lock for a mutex.
int create_index(bsoncxx::document::view keys, const std::string &collection="", bool unique=false)
Create an index on a collection.
int clear_memory()
Remove the whole database of the robot memory and all documents inside.
RobotMemory(fawkes::Configuration *config, fawkes::Logger *logger, fawkes::Clock *clock, fawkes::MongoDBConnCreator *mongo_connection_manager, fawkes::BlackBoard *blackboard)
Robot Memory Constructor with objects of the thread.
int restore_collection(const std::string &dbcollection, const std::string &directory="@CONFDIR@/robot-memory", std::string target_dbcollection="")
Restore a previously dumped collection from a directory.
bool mutex_expire_locks(float max_age_sec)
Expire old locks on mutexes.
The BlackBoard abstract class.
Definition: blackboard.h:46
This is supposed to be the central clock in Fawkes.
Definition: clock.h:35
Interface for configuration handling.
Definition: config.h:68
Base class for exceptions in Fawkes.
Definition: exception.h:36
Host information.
Definition: hostinfo.h:32
const char * name()
Get full hostname.
Definition: hostinfo.cpp:100
Interface for logging.
Definition: logger.h:42
Interface for a MongoDB connection creator.
Mutex locking helper.
Definition: mutex_locker.h:34
static std::string resolve_path(std::string s)
Resolves path-string with @...@ tags.
Time tracking utility.
Definition: tracker.h:37
Fawkes library namespace.