37 #include <core/threading/mutex_locker.h>
38 #include <google/protobuf/descriptor.h>
39 #include <logging/logger.h>
40 #include <protobuf_clips/communicator.h>
41 #include <protobuf_comm/client.h>
42 #include <protobuf_comm/peer.h>
43 #include <protobuf_comm/server.h>
45 #include <boost/format.hpp>
47 using namespace google::protobuf;
48 using namespace protobuf_comm;
49 using namespace boost::placeholders;
51 namespace protobuf_clips {
67 ClipsProtobufCommunicator::ClipsProtobufCommunicator(CLIPS::Environment *env,
70 : clips_(env), clips_mutex_(env_mutex), logger_(logger), server_(NULL), next_client_id_(0)
72 message_register_ =
new MessageRegister();
84 std::vector<std::string> &proto_path,
86 : clips_(env), clips_mutex_(env_mutex), logger_(logger), server_(NULL), next_client_id_(0)
88 message_register_ =
new MessageRegister(proto_path);
98 for (
auto f : functions_) {
99 clips_->remove_function(f);
104 for (
auto c : clients_) {
109 delete message_register_;
113 #define ADD_FUNCTION(n, s) \
114 clips_->add_function(n, s); \
115 functions_.push_back(n);
119 ClipsProtobufCommunicator::setup_clips()
123 ADD_FUNCTION(
"pb-register-type",
124 (sigc::slot<CLIPS::Value, std::string>(
125 sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_register_type))));
126 ADD_FUNCTION(
"pb-field-names",
127 (sigc::slot<CLIPS::Values, void *>(
128 sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_field_names))));
129 ADD_FUNCTION(
"pb-field-type",
130 (sigc::slot<CLIPS::Value, void *, std::string>(
131 sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_field_type))));
132 ADD_FUNCTION(
"pb-has-field",
133 (sigc::slot<CLIPS::Value, void *, std::string>(
134 sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_has_field))));
135 ADD_FUNCTION(
"pb-field-label",
136 (sigc::slot<CLIPS::Value, void *, std::string>(
137 sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_field_label))));
138 ADD_FUNCTION(
"pb-field-value",
139 (sigc::slot<CLIPS::Value, void *, std::string>(
140 sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_field_value))));
141 ADD_FUNCTION(
"pb-field-list",
142 (sigc::slot<CLIPS::Values, void *, std::string>(
143 sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_field_list))));
144 ADD_FUNCTION(
"pb-field-is-list",
145 (sigc::slot<CLIPS::Value, void *, std::string>(
146 sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_field_is_list))));
147 ADD_FUNCTION(
"pb-create",
148 (sigc::slot<CLIPS::Value, std::string>(
149 sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_create))));
150 ADD_FUNCTION(
"pb-destroy",
151 (sigc::slot<void, void *>(
152 sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_destroy))));
153 ADD_FUNCTION(
"pb-ref",
154 (sigc::slot<CLIPS::Value, void *>(
155 sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_ref))));
156 ADD_FUNCTION(
"pb-set-field",
157 (sigc::slot<void, void *, std::string, CLIPS::Value>(
158 sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_set_field))));
159 ADD_FUNCTION(
"pb-add-list",
160 (sigc::slot<void, void *, std::string, CLIPS::Value>(
161 sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_add_list))));
162 ADD_FUNCTION(
"pb-send",
163 (sigc::slot<void, long int, void *>(
164 sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_send))));
165 ADD_FUNCTION(
"pb-tostring",
166 (sigc::slot<std::string, void *>(
167 sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_tostring))));
168 ADD_FUNCTION(
"pb-server-enable",
169 (sigc::slot<void, int>(
171 ADD_FUNCTION(
"pb-server-disable",
174 ADD_FUNCTION(
"pb-peer-create",
175 (sigc::slot<long int, std::string, int>(
176 sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_peer_create))));
177 ADD_FUNCTION(
"pb-peer-create-local",
178 (sigc::slot<long int, std::string, int, int>(
179 sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_peer_create_local))));
180 ADD_FUNCTION(
"pb-peer-create-crypto",
181 (sigc::slot<long int, std::string, int, std::string, std::string>(
182 sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_peer_create_crypto))));
183 ADD_FUNCTION(
"pb-peer-create-local-crypto",
184 (sigc::slot<long int, std::string, int, int, std::string, std::string>(sigc::mem_fun(
185 *
this, &ClipsProtobufCommunicator::clips_pb_peer_create_local_crypto))));
186 ADD_FUNCTION(
"pb-peer-destroy",
187 (sigc::slot<void, long int>(
188 sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_peer_destroy))));
189 ADD_FUNCTION(
"pb-peer-setup-crypto",
190 (sigc::slot<void, long int, std::string, std::string>(
191 sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_peer_setup_crypto))));
192 ADD_FUNCTION(
"pb-broadcast",
193 (sigc::slot<void, long int, void *>(
194 sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_broadcast))));
195 ADD_FUNCTION(
"pb-connect",
196 (sigc::slot<long int, std::string, int>(
197 sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_client_connect))));
198 ADD_FUNCTION(
"pb-disconnect",
199 (sigc::slot<void, long int>(
200 sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_disconnect))));
209 if ((port > 0) && !server_) {
210 server_ =
new protobuf_comm::ProtobufStreamServer(port, message_register_);
212 server_->signal_connected().connect(
213 boost::bind(&ClipsProtobufCommunicator::handle_server_client_connected,
this, _1, _2));
214 server_->signal_disconnected().connect(
215 boost::bind(&ClipsProtobufCommunicator::handle_server_client_disconnected,
this, _1, _2));
216 server_->signal_received().connect(
217 boost::bind(&ClipsProtobufCommunicator::handle_server_client_msg,
this, _1, _2, _3, _4));
218 server_->signal_receive_failed().connect(
219 boost::bind(&ClipsProtobufCommunicator::handle_server_client_fail,
this, _1, _2, _3, _4));
240 ClipsProtobufCommunicator::clips_pb_peer_create_local_crypto(std::string address,
243 std::string crypto_key,
247 recv_port = send_port;
250 protobuf_comm::ProtobufBroadcastPeer *peer =
new protobuf_comm::ProtobufBroadcastPeer(
251 address, send_port, recv_port, message_register_, crypto_key, cipher);
256 peer_id = ++next_client_id_;
257 peers_[peer_id] = peer;
260 peer->signal_received().connect(
261 boost::bind(&ClipsProtobufCommunicator::handle_peer_msg,
this, peer_id, _1, _2, _3, _4));
262 peer->signal_recv_error().connect(
263 boost::bind(&ClipsProtobufCommunicator::handle_peer_recv_error,
this, peer_id, _1, _2));
264 peer->signal_send_error().connect(
265 boost::bind(&ClipsProtobufCommunicator::handle_peer_send_error,
this, peer_id, _1));
281 ClipsProtobufCommunicator::clips_pb_peer_create_crypto(std::string address,
283 std::string crypto_key,
286 return clips_pb_peer_create_local_crypto(address, port, port, crypto_key, cipher);
295 ClipsProtobufCommunicator::clips_pb_peer_create(std::string address,
int port)
297 return clips_pb_peer_create_local_crypto(address, port, port);
307 ClipsProtobufCommunicator::clips_pb_peer_create_local(std::string address,
311 return clips_pb_peer_create_local_crypto(address, send_port, recv_port);
318 ClipsProtobufCommunicator::clips_pb_peer_destroy(
long int peer_id)
320 if (peers_.find(peer_id) != peers_.end()) {
321 delete peers_[peer_id];
322 peers_.erase(peer_id);
332 ClipsProtobufCommunicator::clips_pb_peer_setup_crypto(
long int peer_id,
333 std::string crypto_key,
336 if (peers_.find(peer_id) != peers_.end()) {
337 peers_[peer_id]->setup_crypto(crypto_key, cipher);
346 ClipsProtobufCommunicator::clips_pb_register_type(std::string full_name)
349 message_register_->add_message_type(full_name);
350 return CLIPS::Value(
"TRUE", CLIPS::TYPE_SYMBOL);
351 }
catch (std::runtime_error &e) {
354 "Registering type %s failed: %s",
358 return CLIPS::Value(
"FALSE", CLIPS::TYPE_SYMBOL);
363 ClipsProtobufCommunicator::clips_pb_create(std::string full_name)
366 std::shared_ptr<google::protobuf::Message> m = message_register_->new_message_for(full_name);
367 return CLIPS::Value(
new std::shared_ptr<google::protobuf::Message>(m));
368 }
catch (std::runtime_error &e) {
371 "Cannot create message of type %s: %s",
375 return CLIPS::Value(
new std::shared_ptr<google::protobuf::Message>());
380 ClipsProtobufCommunicator::clips_pb_ref(
void *msgptr)
382 std::shared_ptr<google::protobuf::Message> *m =
383 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
385 return new std::shared_ptr<google::protobuf::Message>();
387 return CLIPS::Value(
new std::shared_ptr<google::protobuf::Message>(*m));
391 ClipsProtobufCommunicator::clips_pb_destroy(
void *msgptr)
393 std::shared_ptr<google::protobuf::Message> *m =
394 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
402 ClipsProtobufCommunicator::clips_pb_field_names(
void *msgptr)
404 std::shared_ptr<google::protobuf::Message> *m =
405 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
407 return CLIPS::Values();
409 const Descriptor *desc = (*m)->GetDescriptor();
410 const int field_count = desc->field_count();
411 CLIPS::Values field_names(field_count);
412 for (
int i = 0; i < field_count; ++i) {
413 field_names[i].set(desc->field(i)->name(),
true);
419 ClipsProtobufCommunicator::clips_pb_field_type(
void *msgptr, std::string field_name)
421 std::shared_ptr<google::protobuf::Message> *m =
422 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
424 return CLIPS::Value(
"INVALID-MESSAGE", CLIPS::TYPE_SYMBOL);
426 const Descriptor * desc = (*m)->GetDescriptor();
427 const FieldDescriptor *field = desc->FindFieldByName(field_name);
429 return CLIPS::Value(
"DOES-NOT-EXIST", CLIPS::TYPE_SYMBOL);
431 switch (field->type()) {
432 case FieldDescriptor::TYPE_DOUBLE:
return CLIPS::Value(
"DOUBLE", CLIPS::TYPE_SYMBOL);
433 case FieldDescriptor::TYPE_FLOAT:
return CLIPS::Value(
"FLOAT", CLIPS::TYPE_SYMBOL);
434 case FieldDescriptor::TYPE_INT64:
return CLIPS::Value(
"INT64", CLIPS::TYPE_SYMBOL);
435 case FieldDescriptor::TYPE_UINT64:
return CLIPS::Value(
"UINT64", CLIPS::TYPE_SYMBOL);
436 case FieldDescriptor::TYPE_INT32:
return CLIPS::Value(
"INT32", CLIPS::TYPE_SYMBOL);
437 case FieldDescriptor::TYPE_FIXED64:
return CLIPS::Value(
"FIXED64", CLIPS::TYPE_SYMBOL);
438 case FieldDescriptor::TYPE_FIXED32:
return CLIPS::Value(
"FIXED32", CLIPS::TYPE_SYMBOL);
439 case FieldDescriptor::TYPE_BOOL:
return CLIPS::Value(
"BOOL", CLIPS::TYPE_SYMBOL);
440 case FieldDescriptor::TYPE_STRING:
return CLIPS::Value(
"STRING", CLIPS::TYPE_SYMBOL);
441 case FieldDescriptor::TYPE_MESSAGE:
return CLIPS::Value(
"MESSAGE", CLIPS::TYPE_SYMBOL);
442 case FieldDescriptor::TYPE_BYTES:
return CLIPS::Value(
"BYTES", CLIPS::TYPE_SYMBOL);
443 case FieldDescriptor::TYPE_UINT32:
return CLIPS::Value(
"UINT32", CLIPS::TYPE_SYMBOL);
444 case FieldDescriptor::TYPE_ENUM:
return CLIPS::Value(
"ENUM", CLIPS::TYPE_SYMBOL);
445 case FieldDescriptor::TYPE_SFIXED32:
return CLIPS::Value(
"SFIXED32", CLIPS::TYPE_SYMBOL);
446 case FieldDescriptor::TYPE_SFIXED64:
return CLIPS::Value(
"SFIXED64", CLIPS::TYPE_SYMBOL);
447 case FieldDescriptor::TYPE_SINT32:
return CLIPS::Value(
"SINT32", CLIPS::TYPE_SYMBOL);
448 case FieldDescriptor::TYPE_SINT64:
return CLIPS::Value(
"SINT64", CLIPS::TYPE_SYMBOL);
449 default:
return CLIPS::Value(
"UNKNOWN", CLIPS::TYPE_SYMBOL);
454 ClipsProtobufCommunicator::clips_pb_has_field(
void *msgptr, std::string field_name)
456 std::shared_ptr<google::protobuf::Message> *m =
457 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
461 const Descriptor * desc = (*m)->GetDescriptor();
462 const FieldDescriptor *field = desc->FindFieldByName(field_name);
466 const Reflection *refl = (*m)->GetReflection();
468 if (field->is_repeated()) {
469 return CLIPS::Value((refl->FieldSize(**m, field) > 0) ?
"TRUE" :
"FALSE", CLIPS::TYPE_SYMBOL);
470 }
else if (field->is_optional()) {
471 return CLIPS::Value(refl->HasField(**m, field) ?
"TRUE" :
"FALSE", CLIPS::TYPE_SYMBOL);
473 return CLIPS::Value(
"TRUE", CLIPS::TYPE_SYMBOL);
478 ClipsProtobufCommunicator::clips_pb_field_label(
void *msgptr, std::string field_name)
480 std::shared_ptr<google::protobuf::Message> *m =
481 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
483 return CLIPS::Value(
"INVALID-MESSAGE", CLIPS::TYPE_SYMBOL);
485 const Descriptor * desc = (*m)->GetDescriptor();
486 const FieldDescriptor *field = desc->FindFieldByName(field_name);
488 return CLIPS::Value(
"DOES-NOT-EXIST", CLIPS::TYPE_SYMBOL);
490 switch (field->label()) {
491 case FieldDescriptor::LABEL_OPTIONAL:
return CLIPS::Value(
"OPTIONAL", CLIPS::TYPE_SYMBOL);
492 case FieldDescriptor::LABEL_REQUIRED:
return CLIPS::Value(
"REQUIRED", CLIPS::TYPE_SYMBOL);
493 case FieldDescriptor::LABEL_REPEATED:
return CLIPS::Value(
"REPEATED", CLIPS::TYPE_SYMBOL);
494 default:
return CLIPS::Value(
"UNKNOWN", CLIPS::TYPE_SYMBOL);
499 ClipsProtobufCommunicator::clips_pb_field_value(
void *msgptr, std::string field_name)
501 std::shared_ptr<google::protobuf::Message> *m =
502 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
505 logger_->
log_warn(
"CLIPS-Protobuf",
"Invalid message when setting %s", field_name.c_str());
507 return CLIPS::Value(
"INVALID-MESSAGE", CLIPS::TYPE_SYMBOL);
510 const Descriptor * desc = (*m)->GetDescriptor();
511 const FieldDescriptor *field = desc->FindFieldByName(field_name);
515 "Field %s of %s does not exist",
517 (*m)->GetTypeName().c_str());
519 return CLIPS::Value(
"DOES-NOT-EXIST", CLIPS::TYPE_SYMBOL);
521 const Reflection *refl = (*m)->GetReflection();
522 if (field->type() != FieldDescriptor::TYPE_MESSAGE && !refl->HasField(**m, field)) {
525 "Field %s of %s not set",
527 (*m)->GetTypeName().c_str());
529 return CLIPS::Value(
"NOT-SET", CLIPS::TYPE_SYMBOL);
531 switch (field->type()) {
532 case FieldDescriptor::TYPE_DOUBLE:
return CLIPS::Value(refl->GetDouble(**m, field));
533 case FieldDescriptor::TYPE_FLOAT:
return CLIPS::Value(refl->GetFloat(**m, field));
534 case FieldDescriptor::TYPE_INT64:
return CLIPS::Value(refl->GetInt64(**m, field));
535 case FieldDescriptor::TYPE_UINT64:
return CLIPS::Value((
long int)refl->GetUInt64(**m, field));
536 case FieldDescriptor::TYPE_INT32:
return CLIPS::Value(refl->GetInt32(**m, field));
537 case FieldDescriptor::TYPE_FIXED64:
return CLIPS::Value((
long int)refl->GetUInt64(**m, field));
538 case FieldDescriptor::TYPE_FIXED32:
return CLIPS::Value(refl->GetUInt32(**m, field));
539 case FieldDescriptor::TYPE_BOOL:
541 if (refl->GetBool(**m, field)) {
542 return CLIPS::Value(
"TRUE", CLIPS::TYPE_SYMBOL);
544 return CLIPS::Value(
"FALSE", CLIPS::TYPE_SYMBOL);
546 case FieldDescriptor::TYPE_STRING:
return CLIPS::Value(refl->GetString(**m, field));
547 case FieldDescriptor::TYPE_MESSAGE: {
548 const google::protobuf::Message &mfield = refl->GetMessage(**m, field);
549 google::protobuf::Message * mcopy = mfield.New();
550 mcopy->CopyFrom(mfield);
551 void *ptr =
new std::shared_ptr<google::protobuf::Message>(mcopy);
552 return CLIPS::Value(ptr);
554 case FieldDescriptor::TYPE_BYTES:
return CLIPS::Value((
char *)
"bytes");
555 case FieldDescriptor::TYPE_UINT32:
return CLIPS::Value(refl->GetUInt32(**m, field));
556 case FieldDescriptor::TYPE_ENUM:
557 return CLIPS::Value(refl->GetEnum(**m, field)->name(), CLIPS::TYPE_SYMBOL);
558 case FieldDescriptor::TYPE_SFIXED32:
return CLIPS::Value(refl->GetInt32(**m, field));
559 case FieldDescriptor::TYPE_SFIXED64:
return CLIPS::Value(refl->GetInt64(**m, field));
560 case FieldDescriptor::TYPE_SINT32:
return CLIPS::Value(refl->GetInt32(**m, field));
561 case FieldDescriptor::TYPE_SINT64:
return CLIPS::Value(refl->GetInt64(**m, field));
562 default:
throw std::logic_error(
"Unknown protobuf field type encountered");
567 ClipsProtobufCommunicator::clips_pb_set_field(
void * msgptr,
568 std::string field_name,
571 std::shared_ptr<google::protobuf::Message> *m =
572 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
576 const Descriptor * desc = (*m)->GetDescriptor();
577 const FieldDescriptor *field = desc->FindFieldByName(field_name);
580 logger_->
log_warn(
"CLIPS-Protobuf",
"Could not find field %s", field_name.c_str());
584 const Reflection *refl = (*m)->GetReflection();
587 switch (field->type()) {
588 case FieldDescriptor::TYPE_DOUBLE: refl->SetDouble(m->get(), field, value.as_float());
break;
589 case FieldDescriptor::TYPE_FLOAT: refl->SetFloat(m->get(), field, value.as_float());
break;
590 case FieldDescriptor::TYPE_SFIXED64:
591 case FieldDescriptor::TYPE_SINT64:
592 case FieldDescriptor::TYPE_INT64: refl->SetInt64(m->get(), field, value.as_integer());
break;
593 case FieldDescriptor::TYPE_FIXED64:
594 case FieldDescriptor::TYPE_UINT64: refl->SetUInt64(m->get(), field, value.as_integer());
break;
595 case FieldDescriptor::TYPE_SFIXED32:
596 case FieldDescriptor::TYPE_SINT32:
597 case FieldDescriptor::TYPE_INT32: refl->SetInt32(m->get(), field, value.as_integer());
break;
598 case FieldDescriptor::TYPE_BOOL: refl->SetBool(m->get(), field, (value ==
"TRUE"));
break;
599 case FieldDescriptor::TYPE_STRING: refl->SetString(m->get(), field, value.as_string());
break;
600 case FieldDescriptor::TYPE_MESSAGE: {
601 std::shared_ptr<google::protobuf::Message> *mfrom =
602 static_cast<std::shared_ptr<google::protobuf::Message> *
>(value.as_address());
603 Message *mut_msg = refl->MutableMessage(m->get(), field);
604 mut_msg->CopyFrom(**mfrom);
607 case FieldDescriptor::TYPE_BYTES:
break;
608 case FieldDescriptor::TYPE_FIXED32:
609 case FieldDescriptor::TYPE_UINT32: refl->SetUInt32(m->get(), field, value.as_integer());
break;
610 case FieldDescriptor::TYPE_ENUM: {
611 const EnumDescriptor * enumdesc = field->enum_type();
612 const EnumValueDescriptor *enumval = enumdesc->FindValueByName(value);
614 refl->SetEnum(m->get(), field, enumval);
618 "%s: cannot set invalid "
619 "enum value '%s' on '%s'",
620 (*m)->GetTypeName().c_str(),
621 value.as_string().c_str(),
626 default:
throw std::logic_error(
"Unknown protobuf field type encountered");
628 }
catch (std::logic_error &e) {
631 "Failed to set field %s of %s: %s "
632 "(type %d, as string %s)",
634 (*m)->GetTypeName().c_str(),
637 to_string(value).c_str());
643 ClipsProtobufCommunicator::clips_pb_add_list(
void * msgptr,
644 std::string field_name,
647 std::shared_ptr<google::protobuf::Message> *m =
648 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
652 const Descriptor * desc = (*m)->GetDescriptor();
653 const FieldDescriptor *field = desc->FindFieldByName(field_name);
656 logger_->
log_warn(
"CLIPS-Protobuf",
"Could not find field %s", field_name.c_str());
660 const Reflection *refl = (*m)->GetReflection();
663 switch (field->type()) {
664 case FieldDescriptor::TYPE_DOUBLE: refl->AddDouble(m->get(), field, value);
break;
665 case FieldDescriptor::TYPE_FLOAT: refl->AddFloat(m->get(), field, value);
break;
666 case FieldDescriptor::TYPE_SFIXED64:
667 case FieldDescriptor::TYPE_SINT64:
668 case FieldDescriptor::TYPE_INT64: refl->AddInt64(m->get(), field, value);
break;
669 case FieldDescriptor::TYPE_FIXED64:
670 case FieldDescriptor::TYPE_UINT64: refl->AddUInt64(m->get(), field, (
long int)value);
break;
671 case FieldDescriptor::TYPE_SFIXED32:
672 case FieldDescriptor::TYPE_SINT32:
673 case FieldDescriptor::TYPE_INT32: refl->AddInt32(m->get(), field, value);
break;
674 case FieldDescriptor::TYPE_BOOL: refl->AddBool(m->get(), field, (value ==
"TRUE"));
break;
675 case FieldDescriptor::TYPE_STRING: refl->AddString(m->get(), field, value);
break;
676 case FieldDescriptor::TYPE_MESSAGE: {
677 std::shared_ptr<google::protobuf::Message> *mfrom =
678 static_cast<std::shared_ptr<google::protobuf::Message> *
>(value.as_address());
679 Message *new_msg = refl->AddMessage(m->get(), field);
680 new_msg->CopyFrom(**mfrom);
683 case FieldDescriptor::TYPE_BYTES:
break;
684 case FieldDescriptor::TYPE_FIXED32:
685 case FieldDescriptor::TYPE_UINT32: refl->AddUInt32(m->get(), field, value);
break;
686 case FieldDescriptor::TYPE_ENUM: {
687 const EnumDescriptor * enumdesc = field->enum_type();
688 const EnumValueDescriptor *enumval = enumdesc->FindValueByName(value);
690 refl->AddEnum(m->get(), field, enumval);
692 default:
throw std::logic_error(
"Unknown protobuf field type encountered");
694 }
catch (std::logic_error &e) {
697 "Failed to add field %s of %s: %s",
699 (*m)->GetTypeName().c_str(),
706 ClipsProtobufCommunicator::clips_pb_client_connect(std::string host,
int port)
711 ProtobufStreamClient *client =
new ProtobufStreamClient(message_register_);
716 client_id = ++next_client_id_;
717 clients_[client_id] = client;
720 client->signal_connected().connect(
721 boost::bind(&ClipsProtobufCommunicator::handle_client_connected,
this, client_id));
722 client->signal_disconnected().connect(
723 boost::bind(&ClipsProtobufCommunicator::handle_client_disconnected,
726 boost::asio::placeholders::error));
727 client->signal_received().connect(
728 boost::bind(&ClipsProtobufCommunicator::handle_client_msg,
this, client_id, _1, _2, _3));
729 client->signal_receive_failed().connect(boost::bind(
730 &ClipsProtobufCommunicator::handle_client_receive_fail,
this, client_id, _1, _2, _3));
732 client->async_connect(host.c_str(), port);
733 return CLIPS::Value(client_id);
737 ClipsProtobufCommunicator::clips_pb_send(
long int client_id,
void *msgptr)
739 std::shared_ptr<google::protobuf::Message> *m =
740 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
743 logger_->
log_warn(
"CLIPS-Protobuf",
"Cannot send to %li: invalid message", client_id);
751 if (server_ && server_clients_.find(client_id) != server_clients_.end()) {
753 server_->send(server_clients_[client_id], *m);
754 sig_server_sent_(server_clients_[client_id], *m);
755 }
else if (clients_.find(client_id) != clients_.end()) {
757 clients_[client_id]->send(*m);
758 std::pair<std::string, unsigned short> &client_endpoint = client_endpoints_[client_id];
759 sig_client_sent_(client_endpoint.first, client_endpoint.second, *m);
760 }
else if (peers_.find(client_id) != peers_.end()) {
762 peers_[client_id]->send(*m);
763 sig_peer_sent_(client_id, *m);
768 }
catch (google::protobuf::FatalException &e) {
771 "Failed to send message of type %s: %s",
772 (*m)->GetTypeName().c_str(),
778 "Failed to send message of type %s: %s",
779 (*m)->GetTypeName().c_str(),
782 }
catch (std::runtime_error &e) {
785 "Failed to send message of type %s: %s",
786 (*m)->GetTypeName().c_str(),
793 ClipsProtobufCommunicator::clips_pb_tostring(
void *msgptr)
795 std::shared_ptr<google::protobuf::Message> *m =
796 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
799 logger_->
log_warn(
"CLIPS-Protobuf",
"Cannot convert message to string: invalid message");
804 return (*m)->DebugString();
808 ClipsProtobufCommunicator::clips_pb_broadcast(
long int peer_id,
void *msgptr)
810 std::shared_ptr<google::protobuf::Message> *m =
811 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
814 logger_->
log_warn(
"CLIPS-Protobuf",
"Cannot send broadcast: invalid message");
820 if (peers_.find(peer_id) == peers_.end())
825 peers_[peer_id]->send(*m);
826 }
catch (google::protobuf::FatalException &e) {
829 "Failed to broadcast message of type %s: %s",
830 (*m)->GetTypeName().c_str(),
836 "Failed to broadcast message of type %s: %s",
837 (*m)->GetTypeName().c_str(),
840 }
catch (std::runtime_error &e) {
843 "Failed to broadcast message of type %s: %s",
844 (*m)->GetTypeName().c_str(),
849 sig_peer_sent_(peer_id, *m);
853 ClipsProtobufCommunicator::clips_pb_disconnect(
long int client_id)
860 if (server_clients_.find(client_id) != server_clients_.end()) {
861 protobuf_comm::ProtobufStreamServer::ClientID srv_client = server_clients_[client_id];
862 server_->disconnect(srv_client);
863 server_clients_.erase(client_id);
864 rev_server_clients_.erase(srv_client);
865 }
else if (clients_.find(client_id) != clients_.end()) {
866 delete clients_[client_id];
867 clients_.erase(client_id);
869 }
catch (std::runtime_error &e) {
872 "Failed to disconnect from client %li: %s",
880 ClipsProtobufCommunicator::clips_pb_field_list(
void *msgptr, std::string field_name)
882 std::shared_ptr<google::protobuf::Message> *m =
883 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
885 return CLIPS::Values(1, CLIPS::Value(
"INVALID-MESSAGE", CLIPS::TYPE_SYMBOL));
887 const Descriptor * desc = (*m)->GetDescriptor();
888 const FieldDescriptor *field = desc->FindFieldByName(field_name);
890 return CLIPS::Values(1, CLIPS::Value(
"DOES-NOT-EXIST", CLIPS::TYPE_SYMBOL));
892 if (field->label() == FieldDescriptor::LABEL_REQUIRED
893 || field->label() == FieldDescriptor::LABEL_OPTIONAL) {
894 CLIPS::Values rv(1, clips_pb_field_value(msgptr, field_name));
898 const Reflection *refl = (*m)->GetReflection();
899 int field_size = refl->FieldSize(**m, field);
900 CLIPS::Values rv(field_size);
901 for (
int i = 0; i < field_size; ++i) {
902 switch (field->type()) {
903 case FieldDescriptor::TYPE_DOUBLE:
904 rv[i] = CLIPS::Value(refl->GetRepeatedDouble(**m, field, i));
906 case FieldDescriptor::TYPE_FLOAT:
907 rv[i] = CLIPS::Value(refl->GetRepeatedFloat(**m, field, i));
910 case FieldDescriptor::TYPE_UINT64:
911 case FieldDescriptor::TYPE_FIXED64:
912 rv[i] = CLIPS::Value((
long int)refl->GetRepeatedUInt64(**m, field, i));
914 case FieldDescriptor::TYPE_UINT32:
915 case FieldDescriptor::TYPE_FIXED32:
916 rv[i] = CLIPS::Value(refl->GetRepeatedUInt32(**m, field, i));
918 case FieldDescriptor::TYPE_BOOL:
920 if (refl->GetRepeatedBool(**m, field, i)) {
921 rv[i] = CLIPS::Value(
"TRUE", CLIPS::TYPE_SYMBOL);
923 rv[i] = CLIPS::Value(
"FALSE", CLIPS::TYPE_SYMBOL);
926 case FieldDescriptor::TYPE_STRING:
927 rv[i] = CLIPS::Value(refl->GetRepeatedString(**m, field, i));
929 case FieldDescriptor::TYPE_MESSAGE: {
930 const google::protobuf::Message &msg = refl->GetRepeatedMessage(**m, field, i);
931 google::protobuf::Message * mcopy = msg.New();
932 mcopy->CopyFrom(msg);
933 void *ptr =
new std::shared_ptr<google::protobuf::Message>(mcopy);
934 rv[i] = CLIPS::Value(ptr);
936 case FieldDescriptor::TYPE_BYTES:
937 rv[i] = CLIPS::Value((
char *)
"BYTES", CLIPS::TYPE_SYMBOL);
939 case FieldDescriptor::TYPE_ENUM:
940 rv[i] = CLIPS::Value(refl->GetRepeatedEnum(**m, field, i)->name(), CLIPS::TYPE_SYMBOL);
942 case FieldDescriptor::TYPE_SFIXED32:
943 case FieldDescriptor::TYPE_INT32:
944 case FieldDescriptor::TYPE_SINT32:
945 rv[i] = CLIPS::Value(refl->GetRepeatedInt32(**m, field, i));
947 case FieldDescriptor::TYPE_SFIXED64:
948 case FieldDescriptor::TYPE_SINT64:
949 case FieldDescriptor::TYPE_INT64:
950 rv[i] = CLIPS::Value(refl->GetRepeatedInt64(**m, field, i));
952 default:
throw std::logic_error(
"Unknown protobuf field type encountered");
960 ClipsProtobufCommunicator::clips_pb_field_is_list(
void *msgptr, std::string field_name)
962 std::shared_ptr<google::protobuf::Message> *m =
963 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
965 return CLIPS::Value(
"FALSE", CLIPS::TYPE_SYMBOL);
967 const Descriptor * desc = (*m)->GetDescriptor();
968 const FieldDescriptor *field = desc->FindFieldByName(field_name);
970 return CLIPS::Value(
"FALSE", CLIPS::TYPE_SYMBOL);
971 return CLIPS::Value(field->is_repeated() ?
"TRUE" :
"FALSE", CLIPS::TYPE_SYMBOL);
975 ClipsProtobufCommunicator::clips_assert_message(std::pair<std::string, unsigned short> &endpoint,
978 std::shared_ptr<google::protobuf::Message> &msg,
979 ClipsProtobufCommunicator::ClientType ct,
982 CLIPS::Template::pointer temp = clips_->get_template(
"protobuf-msg");
985 gettimeofday(&tv, 0);
986 void * ptr =
new std::shared_ptr<google::protobuf::Message>(msg);
987 CLIPS::Fact::pointer fact = CLIPS::Fact::create(*clips_, temp);
988 fact->set_slot(
"type", msg->GetTypeName());
989 fact->set_slot(
"comp-id", comp_id);
990 fact->set_slot(
"msg-type", msg_type);
991 fact->set_slot(
"rcvd-via",
992 CLIPS::Value((ct == CT_PEER) ?
"BROADCAST" :
"STREAM", CLIPS::TYPE_SYMBOL));
993 CLIPS::Values rcvd_at(2, CLIPS::Value(CLIPS::TYPE_INTEGER));
994 rcvd_at[0] = tv.tv_sec;
995 rcvd_at[1] = tv.tv_usec;
996 fact->set_slot(
"rcvd-at", rcvd_at);
997 CLIPS::Values host_port(2, CLIPS::Value(CLIPS::TYPE_STRING));
998 host_port[0] = endpoint.first;
999 host_port[1] = CLIPS::Value(endpoint.second);
1000 fact->set_slot(
"rcvd-from", host_port);
1001 fact->set_slot(
"client-type",
1002 CLIPS::Value(ct == CT_CLIENT ?
"CLIENT" : (ct == CT_SERVER ?
"SERVER" :
"PEER"),
1003 CLIPS::TYPE_SYMBOL));
1004 fact->set_slot(
"client-id", client_id);
1005 fact->set_slot(
"ptr", CLIPS::Value(ptr));
1006 CLIPS::Fact::pointer new_fact = clips_->assert_fact(fact);
1010 logger_->
log_warn(
"CLIPS-Protobuf",
"Asserting protobuf-msg fact failed");
1012 delete static_cast<std::shared_ptr<google::protobuf::Message> *
>(ptr);
1016 logger_->
log_warn(
"CLIPS-Protobuf",
"Did not get template, did you load protobuf.clp?");
1022 ClipsProtobufCommunicator::handle_server_client_connected(ProtobufStreamServer::ClientID client,
1023 boost::asio::ip::tcp::endpoint &endpoint)
1025 long int client_id = -1;
1028 client_id = ++next_client_id_;
1029 client_endpoints_[client_id] = std::make_pair(endpoint.address().to_string(), endpoint.port());
1030 server_clients_[client_id] = client;
1031 rev_server_clients_[client] = client_id;
1035 clips_->assert_fact_f(
"(protobuf-server-client-connected %li %s %u)",
1037 endpoint.address().to_string().c_str(),
1042 ClipsProtobufCommunicator::handle_server_client_disconnected(ProtobufStreamServer::ClientID client,
1043 const boost::system::error_code &error)
1045 long int client_id = -1;
1048 RevServerClientMap::iterator c;
1049 if ((c = rev_server_clients_.find(client)) != rev_server_clients_.end()) {
1050 client_id = c->second;
1051 rev_server_clients_.erase(c);
1052 server_clients_.erase(client_id);
1056 if (client_id >= 0) {
1058 clips_->assert_fact_f(
"(protobuf-server-client-disconnected %li)", client_id);
1069 ClipsProtobufCommunicator::handle_server_client_msg(ProtobufStreamServer::ClientID client,
1070 uint16_t component_id,
1072 std::shared_ptr<google::protobuf::Message> msg)
1076 RevServerClientMap::iterator c;
1077 if ((c = rev_server_clients_.find(client)) != rev_server_clients_.end()) {
1078 clips_assert_message(
1079 client_endpoints_[c->second], component_id, msg_type, msg, CT_SERVER, c->second);
1090 ClipsProtobufCommunicator::handle_server_client_fail(ProtobufStreamServer::ClientID client,
1091 uint16_t component_id,
1096 RevServerClientMap::iterator c;
1097 if ((c = rev_server_clients_.find(client)) != rev_server_clients_.end()) {
1099 clips_->assert_fact_f(
"(protobuf-server-receive-failed (comp-id %u) (msg-type %u) "
1100 "(rcvd-via STREAM) (client-id %li) (message \"%s\") "
1101 "(rcvd-from (\"%s\" %u)))",
1106 client_endpoints_[c->second].first.c_str(),
1107 client_endpoints_[c->second].second);
1118 ClipsProtobufCommunicator::handle_peer_msg(
long int peer_id,
1119 boost::asio::ip::udp::endpoint & endpoint,
1120 uint16_t component_id,
1122 std::shared_ptr<google::protobuf::Message> msg)
1125 std::pair<std::string, unsigned short> endpp =
1126 std::make_pair(endpoint.address().to_string(), endpoint.port());
1127 clips_assert_message(endpp, component_id, msg_type, msg, CT_PEER, peer_id);
1135 ClipsProtobufCommunicator::handle_peer_recv_error(
long int peer_id,
1136 boost::asio::ip::udp::endpoint &endpoint,
1140 logger_->
log_warn(
"CLIPS-Protobuf",
1141 "Failed to receive peer message from %s:%u: %s",
1142 endpoint.address().to_string().c_str(),
1152 ClipsProtobufCommunicator::handle_peer_send_error(
long int peer_id, std::string msg)
1155 logger_->
log_warn(
"CLIPS-Protobuf",
"Failed to send peer message: %s", msg.c_str());
1160 ClipsProtobufCommunicator::handle_client_connected(
long int client_id)
1163 clips_->assert_fact_f(
"(protobuf-client-connected %li)", client_id);
1167 ClipsProtobufCommunicator::handle_client_disconnected(
long int client_id,
1168 const boost::system::error_code &error)
1171 clips_->assert_fact_f(
"(protobuf-client-disconnected %li)", client_id);
1175 ClipsProtobufCommunicator::handle_client_msg(
long int client_id,
1178 std::shared_ptr<google::protobuf::Message> msg)
1181 std::pair<std::string, unsigned short> endpp = std::make_pair(std::string(), 0);
1182 clips_assert_message(endpp, comp_id, msg_type, msg, CT_CLIENT, client_id);
1186 ClipsProtobufCommunicator::handle_client_receive_fail(
long int client_id,
1192 clips_->assert_fact_f(
"(protobuf-receive-failed (client-id %li) (rcvd-via STREAM) "
1193 "(comp-id %u) (msg-type %u) (message \"%s\"))",
1201 ClipsProtobufCommunicator::to_string(
const CLIPS::Value &v)
1204 case CLIPS::TYPE_UNKNOWN:
return "Unknown Type";
1205 case CLIPS::TYPE_FLOAT:
return std::to_string(v.as_float());
1206 case CLIPS::TYPE_INTEGER:
return std::to_string(v.as_integer());
1207 case CLIPS::TYPE_SYMBOL:
1208 case CLIPS::TYPE_INSTANCE_NAME:
1209 case CLIPS::TYPE_STRING:
return v.as_string();
1210 case CLIPS::TYPE_INSTANCE_ADDRESS:
1211 case CLIPS::TYPE_EXTERNAL_ADDRESS:
return boost::str(boost::format(
"%p") % v.as_address());
1213 return "Implicit unknown type";
Base class for exceptions in Fawkes.
virtual const char * what_no_backtrace() const noexcept
Get primary string (does not implicitly print the back trace).
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
virtual void log_error(const char *component, const char *format,...)=0
Log error message.
Mutex mutual exclusion lock.
void enable_server(int port)
Enable protobuf stream server.
ClipsProtobufCommunicator(CLIPS::Environment *env, fawkes::Mutex &env_mutex, fawkes::Logger *logger=NULL)
Constructor.
~ClipsProtobufCommunicator()
Destructor.
void disable_server()
Disable protobu stream server.