SourceXtractorPlusPlus
0.13
Please provide a description of the project.
SEImplementation
src
lib
Measurement
MultithreadedMeasurement.cpp
Go to the documentation of this file.
1
17
/*
18
* MultiThreadedMeasurement.cpp
19
*
20
* Created on: May 23, 2018
21
* Author: mschefer
22
*/
23
24
#include <chrono>
25
#include <
ElementsKernel/Logging.h
>
26
#include <csignal>
27
28
#include "
SEImplementation/Plugin/SourceIDs/SourceID.h
"
29
#include "
SEImplementation/Measurement/MultithreadedMeasurement.h
"
30
31
using namespace
SourceXtractor
;
32
33
static
Elements::Logging
logger
=
Elements::Logging::getLogger
(
"Multithreading"
);
34
35
std::recursive_mutex
MultithreadedMeasurement::g_global_mutex
;
36
37
void
MultithreadedMeasurement::startThreads
() {
38
m_output_thread
= Euclid::make_unique<std::thread>(
outputThreadStatic
,
this
);
39
}
40
41
void
MultithreadedMeasurement::waitForThreads
() {
42
m_input_done
=
true
;
43
m_thread_pool
->
block
();
44
m_output_thread
->
join
();
45
logger
.debug() <<
"All worker threads done!"
;
46
}
47
48
void
49
MultithreadedMeasurement::handleMessage
(
const
std::shared_ptr<SourceGroupInterface>
& source_group) {
50
// Force computation of SourceID here, where the order is still deterministic
51
for
(
auto
& source : *source_group) {
52
source.getProperty<
SourceID
>();
53
}
54
55
// Put the new SourceGroup into the input queue
56
auto
order_number =
m_group_counter
;
57
m_thread_pool
->
submit
([
this
, order_number, source_group]() {
58
// Trigger measurements
59
for
(
auto
& source : *source_group) {
60
m_source_to_row
(source);
61
}
62
// Pass to the output thread
63
{
64
std::unique_lock<std::mutex>
output_lock(
m_output_queue_mutex
);
65
m_output_queue
.emplace_back(order_number, source_group);
66
}
67
m_new_output
.
notify_one
();
68
});
69
++
m_group_counter
;
70
}
71
72
void
MultithreadedMeasurement::outputThreadStatic
(
MultithreadedMeasurement
*measurement) {
73
logger
.debug() <<
"Starting output thread"
;
74
try
{
75
measurement->
outputThreadLoop
();
76
}
77
catch
(
const
Elements::Exception
&
e
) {
78
logger
.fatal() <<
"Output thread got an exception!"
;
79
logger
.fatal() <<
e
.what();
80
if
(!measurement->
m_abort_raised
.exchange(
true
)) {
81
logger
.fatal() <<
"Aborting the execution"
;
82
::raise(SIGTERM);
83
}
84
}
85
logger
.debug() <<
"Stopping output thread"
;
86
}
87
88
void
MultithreadedMeasurement::outputThreadLoop
() {
89
while
(
true
) {
90
std::unique_lock<std::mutex>
output_lock(
m_output_queue_mutex
);
91
92
// Wait for something in the output queue
93
if
(
m_output_queue
.empty()) {
94
m_new_output
.
wait_for
(output_lock,
std::chrono::milliseconds
(100));
95
}
96
97
// Process the output queue
98
while
(!
m_output_queue
.empty()) {
99
notifyObservers
(
m_output_queue
.front().second);
100
m_output_queue
.pop_front();
101
}
102
103
if
(
m_input_done
&&
m_thread_pool
->
running
() +
m_thread_pool
->
queued
() == 0 &&
104
m_output_queue
.empty()) {
105
break
;
106
}
107
}
108
}
SourceXtractor::Observable< std::shared_ptr< SourceGroupInterface > >::notifyObservers
void notifyObservers(const std::shared_ptr< SourceGroupInterface > &message) const
Definition:
Observable.h:71
std::shared_ptr< SourceGroupInterface >
Elements::Logging
Euclid::ThreadPool::running
size_t running() const
std::chrono::milliseconds
std::recursive_mutex
STL class.
SourceXtractor::MultithreadedMeasurement::handleMessage
void handleMessage(const std::shared_ptr< SourceGroupInterface > &source_group) override
Definition:
MultithreadedMeasurement.cpp:49
SourceXtractor::MultithreadedMeasurement::outputThreadStatic
static void outputThreadStatic(MultithreadedMeasurement *measurement)
Definition:
MultithreadedMeasurement.cpp:72
SourceXtractor::MultithreadedMeasurement::m_input_done
std::atomic_bool m_input_done
Definition:
MultithreadedMeasurement.h:64
SourceXtractor::MultithreadedMeasurement::g_global_mutex
static std::recursive_mutex g_global_mutex
Definition:
MultithreadedMeasurement.h:53
SourceXtractor::MultithreadedMeasurement::m_thread_pool
std::shared_ptr< Euclid::ThreadPool > m_thread_pool
Definition:
MultithreadedMeasurement.h:60
SourceID.h
SourceXtractor::SourceID
Definition:
SourceID.h:33
SourceXtractor::MultithreadedMeasurement::outputThreadLoop
void outputThreadLoop()
Definition:
MultithreadedMeasurement.cpp:88
SourceXtractor
Definition:
Aperture.h:30
Euclid::ThreadPool::queued
size_t queued() const
SourceXtractor::MultithreadedMeasurement::m_abort_raised
std::atomic_bool m_abort_raised
Definition:
MultithreadedMeasurement.h:64
std::unique_lock
STL class.
SourceXtractor::MultithreadedMeasurement::m_source_to_row
SourceToRowConverter m_source_to_row
Definition:
MultithreadedMeasurement.h:59
Elements::Exception
SourceXtractor::MultithreadedMeasurement::m_output_queue_mutex
std::mutex m_output_queue_mutex
Definition:
MultithreadedMeasurement.h:68
SourceXtractor::MultithreadedMeasurement::m_new_output
std::condition_variable m_new_output
Definition:
MultithreadedMeasurement.h:66
std::condition_variable::wait_for
T wait_for(T... args)
SourceXtractor::logger
static auto logger
Definition:
WCS.cpp:44
e
constexpr double e
std::condition_variable::notify_one
T notify_one(T... args)
Elements::Logging::getLogger
static Logging getLogger(const std::string &name="")
Euclid::ThreadPool::block
void block()
SourceXtractor::MultithreadedMeasurement::m_output_queue
std::list< std::pair< int, std::shared_ptr< SourceGroupInterface > > > m_output_queue
Definition:
MultithreadedMeasurement.h:67
SourceXtractor::MultithreadedMeasurement::waitForThreads
void waitForThreads() override
Definition:
MultithreadedMeasurement.cpp:41
MultithreadedMeasurement.h
Logging.h
Euclid::ThreadPool::submit
void submit(Task task)
SourceXtractor::MultithreadedMeasurement::m_output_thread
std::unique_ptr< std::thread > m_output_thread
Definition:
MultithreadedMeasurement.h:61
SourceXtractor::MultithreadedMeasurement::startThreads
void startThreads() override
Definition:
MultithreadedMeasurement.cpp:37
SourceXtractor::MultithreadedMeasurement
Definition:
MultithreadedMeasurement.h:37
SourceXtractor::MultithreadedMeasurement::m_group_counter
int m_group_counter
Definition:
MultithreadedMeasurement.h:63
std::thread::join
T join(T... args)
Generated by
1.8.20