Fawkes API  Fawkes Development Version
qa_bb_remote.cpp
1 
2 /***************************************************************************
3  * qa_bb_remote.cpp - BlackBoard remote access QA
4  *
5  * Created: Mon Mar 03 17:31:18 2008
6  * Copyright 2006-2008 Tim Niemueller [www.niemueller.de]
7  *
8  ****************************************************************************/
9 
10 /* This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version. A runtime exception applies to
14  * this software (see LICENSE.GPL_WRE file mentioned below for details).
15  *
16  * This program is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19  * GNU Library General Public License for more details.
20  *
21  * Read the full text in the LICENSE.GPL_WRE file in the doc directory.
22  */
23 
24 /// @cond QA
25 
26 #include <blackboard/bbconfig.h>
27 #include <blackboard/exceptions.h>
28 #include <blackboard/interface_listener.h>
29 #include <blackboard/local.h>
30 #include <blackboard/remote.h>
31 #include <core/exceptions/system.h>
32 #include <interface/interface_info.h>
33 #include <interfaces/TestInterface.h>
34 #include <netcomm/fawkes/client.h>
35 #include <netcomm/fawkes/server_thread.h>
36 #include <utils/time/time.h>
37 
38 #include <cstdio>
39 #include <cstdlib>
40 #include <cstring>
41 #include <iostream>
42 #include <signal.h>
43 #include <vector>
44 
45 using namespace std;
46 using namespace fawkes;
47 
48 bool quit = false;
49 
50 void
51 signal_handler(int signum)
52 {
53  quit = true;
54 }
55 
56 #define NUM_CHUNKS 5
57 
58 void
59 test_messaging(TestInterface *ti_reader, TestInterface *ti_writer)
60 {
61  while (!quit) {
62  int expval = ti_reader->test_int() + 1;
64  unsigned int msgid = ti_reader->msgq_enqueue(m);
65  printf("Sent with message ID %u\n", msgid);
66 
67  if (ti_writer->msgq_size() > 1) {
68  cout << "Error, more than one message! flushing." << endl;
69  ti_writer->msgq_flush();
70  }
71 
72  usleep(100000);
73 
74  if (ti_writer->msgq_first() != NULL) {
76  TestInterface::SetTestStringMessage *msg = ti_writer->msgq_first(msg);
77  printf(
78  "Received message of ID %u, Message improperly detected to be a SetTestStringMessage\n",
79  msg->id());
80  }
84  printf("Received message with ID %u (enqueue time: %s)\n",
85  m2->id(),
86  m2->time_enqueued()->str());
87  ti_writer->set_test_int(m2->test_int());
88  try {
89  ti_writer->write();
90  } catch (InterfaceWriteDeniedException &e) {
91  cout << "BUG: caught write denied exception" << endl;
92  e.print_trace();
93  }
94  ti_writer->msgq_pop();
95  } else {
96  cout << "Illegal message '" << ti_writer->msgq_first()->type() << "' type received" << endl;
97  }
98 
99  usleep(100000);
100 
101  //cout << "Reading value from reader interface.. " << flush;
102  ti_reader->read();
103  int val = ti_reader->test_int();
104  if (val == expval) {
105  //cout << " success, value is " << ti_reader->test_int() << " as expected" << endl;
106  } else {
107  cout << " failure, value is " << ti_reader->test_int() << ", expected " << expval << endl;
108  }
109  } else {
110  printf("No message in queue, if network test this means the message was dropped\n");
111  }
112 
113  usleep(10);
114  }
115 }
116 
118 {
119 public:
121  fawkes::Interface * writer,
122  fawkes::BlackBoard *reader_bb,
123  fawkes::BlackBoard *writer_bb)
124  : BlackBoardInterfaceListener("SyncInterfaceListener(%s-%s)", writer->uid(), reader->id())
125  {
126  reader_ = reader;
127  writer_ = writer;
128  reader_bb_ = reader_bb;
129  writer_bb_ = writer_bb;
130 
131  bbil_add_data_interface(reader_);
132  bbil_add_message_interface(writer_);
133 
134  reader_bb_->register_listener(this);
135  writer_bb_->register_listener(this);
136  }
137 
138  /** Destructor. */
140  {
141  reader_bb_->unregister_listener(this);
142  writer_bb_->unregister_listener(this);
143  }
144 
145  bool
146  bb_interface_message_received(Interface *interface, Message *message) noexcept
147  {
148  try {
149  if (interface == writer_) {
150  printf("%s: Forwarding message\n", bbil_name());
151  Message *m = message->clone();
152  m->set_hops(message->hops());
153  m->ref();
154  reader_->msgq_enqueue(m);
155  message->set_id(m->id());
156  m->unref();
157  return false;
158  } else {
159  // Don't know why we were called, let 'em enqueue
160  printf("%s: Message received for unknown interface\n", bbil_name());
161  return true;
162  }
163  } catch (Exception &e) {
164  printf("%s: Exception when message received\n", bbil_name());
165  e.print_trace();
166  return false;
167  }
168  }
169 
170  void
171  bb_interface_data_refreshed(Interface *interface) noexcept
172  {
173  try {
174  if (interface == reader_) {
175  //logger_->log_debug(bbil_name(), "Copying data");
176  reader_->read();
177  writer_->copy_values(reader_);
178  writer_->write();
179  } else {
180  // Don't know why we were called, let 'em enqueue
181  printf("%s: Data changed for unknown interface", bbil_name());
182  }
183  } catch (Exception &e) {
184  printf("%s: Exception when data changed\n", bbil_name());
185  e.print_trace();
186  }
187  }
188 
189 private:
190  fawkes::Interface *writer_;
191  fawkes::Interface *reader_;
192 
193  fawkes::BlackBoard *writer_bb_;
194  fawkes::BlackBoard *reader_bb_;
195 };
196 
197 int
198 main(int argc, char **argv)
199 {
200  signal(SIGINT, signal_handler);
201 
202  LocalBlackBoard *llbb = new LocalBlackBoard(BLACKBOARD_MEMSIZE);
203  BlackBoard * lbb = llbb;
204 
206  fns->start();
207 
208  llbb->start_nethandler(fns);
209 
210  BlackBoard *rbb = new RemoteBlackBoard("localhost", 1910);
211 
212  InterfaceInfoList *infl = rbb->list_all();
213  for (InterfaceInfoList::iterator i = infl->begin(); i != infl->end(); ++i) {
214  const unsigned char *hash = (*i).hash();
215  char phash[INTERFACE_HASH_SIZE_ * 2 + 1];
216  memset(phash, 0, sizeof(phash));
217  for (unsigned int j = 0; j < INTERFACE_HASH_SIZE_; ++j) {
218  sprintf(&phash[j * 2], "%02x", hash[j]);
219  }
220  printf("%s::%s (%s), w:%i r:%u s:%u\n",
221  (*i).type(),
222  (*i).id(),
223  phash,
224  (*i).has_writer(),
225  (*i).num_readers(),
226  (*i).serial());
227  }
228  delete infl;
229 
230  //TestInterface *ti_writer;
231  TestInterface *ti_reader;
232  TestInterface *ti_writer;
233  try {
234  cout << "Opening interfaces.. " << flush;
235  ti_writer = rbb->open_for_writing<TestInterface>("SomeID");
236  ti_reader = rbb->open_for_reading<TestInterface>("SomeID");
237  cout << "success, "
238  << "writer hash=" << ti_writer->hash_printable()
239  << " reader hash=" << ti_reader->hash_printable() << endl;
240  } catch (Exception &e) {
241  cout << "failed! Aborting" << endl;
242  e.print_trace();
243  exit(1);
244  }
245 
246  try {
247  cout << "Trying to open second writer.. " << flush;
248  TestInterface *ti_writer_two;
249  ti_writer_two = rbb->open_for_writing<TestInterface>("SomeID");
250  rbb->close(ti_writer_two);
251  cout << "BUG: Detection of second writer did NOT work!" << endl;
252  exit(2);
253  } catch (BlackBoardWriterActiveException &e) {
254  cout << "exception caught as expected, detected and prevented second writer!" << endl;
255  }
256 
257  try {
258  cout << "Trying to open third writer.. " << flush;
259  TestInterface *ti_writer_three;
260  ti_writer_three = rbb->open_for_writing<TestInterface>("AnotherID");
261  cout << "No exception as expected, different ID ok!" << endl;
262  rbb->close(ti_writer_three);
263  } catch (BlackBoardWriterActiveException &e) {
264  cout << "BUG: Third writer with different ID detected as another writer!" << endl;
265  exit(3);
266  }
267 
268  cout << endl
269  << endl
270  << "Running data tests ==================================================" << endl;
271 
272  cout << "Writing initial value (" << TestInterface::TEST_CONSTANT << ") into interface as TestInt"
273  << endl;
274  ti_writer->set_test_int(TestInterface::TEST_CONSTANT);
275  try {
276  ti_writer->write();
277  } catch (InterfaceWriteDeniedException &e) {
278  cout << "BUG: caught write denied exception" << endl;
279  e.print_trace();
280  }
281 
282  cout << "Giving some time to have value processed" << endl;
283  usleep(100000);
284 
285  cout << "Reading value from reader interface.. " << flush;
286  ti_reader->read();
287  int val = ti_reader->test_int();
288  if (val == TestInterface::TEST_CONSTANT) {
289  cout << " success, value is " << ti_reader->test_int() << " as expected" << endl;
290  } else {
291  cout << " failure, value is " << ti_reader->test_int() << ", expected "
292  << TestInterface::TEST_CONSTANT << endl;
293  }
294 
295  cout << "Closing interfaces.. " << flush;
296  try {
297  rbb->close(ti_reader);
298  rbb->close(ti_writer);
299  cout << "done" << endl;
300  } catch (Exception &e) {
301  cout << "failed" << endl;
302  e.print_trace();
303  }
304 
305  cout << endl
306  << endl
307  << "Starting MESSAGING tests" << endl
308  << "Press Ctrl-C to continue with next test" << endl
309  << endl;
310 
311  ti_writer = lbb->open_for_writing<TestInterface>("Messaging");
312  ti_reader = rbb->open_for_reading<TestInterface>("Messaging");
313 
314  printf("Writer serial: %u shifted: %u\n", ti_writer->serial(), ti_writer->serial() << 16);
315  printf("Reader serial: %u shifted: %u\n", ti_reader->serial(), ti_reader->serial() << 16);
316 
317  test_messaging(ti_reader, ti_writer);
318 
319  rbb->close(ti_reader);
320  lbb->close(ti_writer);
321 
322  cout << endl
323  << endl
324  << "Starting MESSAGING tests, doing repeater scenario" << endl
325  << "Press Ctrl-C to continue with next test" << endl
326  << endl;
327  quit = false;
328 
329  delete rbb;
330 
331  LocalBlackBoard *repllbb = new LocalBlackBoard(BLACKBOARD_MEMSIZE);
332 
334  repfns->start();
335 
336  repllbb->start_nethandler(repfns);
337 
338  BlackBoard *rep_rbb = new RemoteBlackBoard("localhost", 1911);
339  rbb = new RemoteBlackBoard("localhost", 1911);
340 
341  TestInterface *rep_reader;
342  TestInterface *rep_writer;
343 
344  ti_writer = rbb->open_for_writing<TestInterface>("Messaging");
345  ti_reader = lbb->open_for_reading<TestInterface>("Messaging");
346 
347  rep_reader = rep_rbb->open_for_reading<TestInterface>("Messaging");
348  rep_writer = lbb->open_for_writing<TestInterface>("Messaging");
349 
350  printf("Writer serial: %u shifted: %u\n", ti_writer->serial(), ti_writer->serial() << 16);
351  printf("Reader serial: %u shifted: %u\n", ti_reader->serial(), ti_reader->serial() << 16);
352 
353  SyncInterfaceListener *sil = new SyncInterfaceListener(rep_reader, rep_writer, rep_rbb, lbb);
354 
355  test_messaging(ti_reader, ti_writer);
356 
357  delete sil;
358  lbb->close(ti_reader);
359  rbb->close(ti_writer);
360  rep_rbb->close(rep_reader);
361  lbb->close(rep_writer);
362  delete repllbb;
363  delete rep_rbb;
364 
365  cout << "Tests done" << endl;
366 
367  delete rbb;
368  delete llbb;
369  delete fns;
370 }
371 
372 /// @endcond
Synchronize two interfaces.
Definition: sync_listener.h:34
BlackBoard interface listener.
Thrown if a writer is already active on an interface that writing has been requested for.
Definition: exceptions.h:125
The BlackBoard abstract class.
Definition: blackboard.h:46
virtual Interface * open_for_reading(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for reading.
virtual InterfaceInfoList * list_all()=0
Get list of all currently existing interfaces.
virtual Interface * open_for_writing(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for writing.
virtual void close(Interface *interface)=0
Close interface.
Base class for exceptions in Fawkes.
Definition: exception.h:36
void print_trace() noexcept
Prints trace to stderr.
Definition: exception.cpp:601
Fawkes Network Thread.
Definition: server_thread.h:49
Interface information list.
This exception is thrown if a write has been attempted on a read-only interface.
Definition: interface.h:56
Base class for all Fawkes BlackBoard interfaces.
Definition: interface.h:80
bool msgq_first_is()
Check if first message has desired type.
Definition: interface.h:351
const char * hash_printable() const
Get printable interface hash.
Definition: interface.cpp:314
void msgq_pop()
Erase first message from queue.
Definition: interface.cpp:1215
Message * msgq_first()
Get the first message from the message queue.
Definition: interface.cpp:1200
unsigned int msgq_enqueue(Message *message, bool proxy=false)
Enqueue message at end of queue.
Definition: interface.cpp:915
unsigned int msgq_size()
Get size of message queue.
Definition: interface.cpp:1045
void write()
Write from local copy into BlackBoard memory.
Definition: interface.cpp:501
Uuid serial() const
Get instance serial of interface.
Definition: interface.cpp:695
void msgq_flush()
Flush all messages.
Definition: interface.cpp:1079
void read()
Read from BlackBoard into local copy.
Definition: interface.cpp:479
Local BlackBoard.
Definition: local.h:45
virtual void start_nethandler(FawkesNetworkHub *hub)
Start network handler.
Definition: local.cpp:194
Base class for all messages passed through interfaces in Fawkes BlackBoard.
Definition: message.h:44
const char * type() const
Get message type.
Definition: message.cpp:381
const Time * time_enqueued() const
Get time when message was enqueued.
Definition: message.cpp:262
void set_hops(unsigned int hops)
Set number of hops.
Definition: message.cpp:227
unsigned int id() const
Get message ID.
Definition: message.cpp:181
void unref()
Decrement reference count and conditionally delete this instance.
Definition: refcount.cpp:95
void ref()
Increment reference count.
Definition: refcount.cpp:67
Remote BlackBoard.
Definition: remote.h:50
SetTestIntMessage Fawkes BlackBoard Interface Message.
Definition: TestInterface.h:69
virtual Message * clone() const
Clone this message.
int32_t test_int() const
Get test_int value.
SetTestStringMessage Fawkes BlackBoard Interface Message.
Definition: TestInterface.h:95
TestInterface Fawkes BlackBoard Interface.
Definition: TestInterface.h:34
void set_test_int(const int32_t new_test_int)
Set test_int value.
int32_t test_int() const
Get test_int value.
void start(bool wait=true)
Call this method to start the thread.
Definition: thread.cpp:499
const char * str(bool utc=false) const
Output function.
Definition: time.cpp:790
Fawkes library namespace.