Skip to content
Snippets Groups Projects
Commit 26affdf2 authored by Erkan Karabulut's avatar Erkan Karabulut
Browse files

create EFPF connector

parent ca9a0c60
No related branches found
No related tags found
No related merge requests found
Showing
with 574 additions and 595 deletions
......@@ -38,7 +38,7 @@ build_docker:
# script is the only required keyword that a job needs. It’s a shell script that is executed by the runner.
# A good practice is to have the scripts in files and call the files here
script:
- docker build -t $CI_REGISTRY_IMAGE/tsmatch-api:latest -f ./TSMatch_API/Dockerfile ./TSMatch_API
- docker build -t $CI_REGISTRY_IMAGE/tsmatch-api:latest -f ./EFPF_Connector/Dockerfile ./EFPF_Connector
- docker build -t $CI_REGISTRY_IMAGE/tsmatch-engine:latest -f ./TSMatch_Engine/Dockerfile ./TSMatch_Engine
- docker build -t $CI_REGISTRY_IMAGE/graphdb:latest -f ./graphdb/Dockerfile ./graphdb
- docker build -t $CI_REGISTRY_IMAGE/broker:latest -f ./broker/Dockerfile ./broker
......
File moved
File moved
......@@ -9,21 +9,15 @@ BROKER_USERNAME=iiotlabadmin
BROKER_PASSWORD=iiotlabadmin
# external broker information
# BROKER_URL_EXTERNAL=mqtt://broker.hivemq.com:1883
BROKER_URL_EXTERNAL=mqtt://10.0.33.39:1883
BROKER_USERNAME_EXTERNAL=iiotlabadmin
BROKER_PASSWORD_EXTERNAL=iiotlabadmin
docker
TOPIC_DISCOVERY=INNOVINT/DISCOVERY/NDATA/TSMATCH_INNOVINT_1
TOPIC_DEADVERTIZE=INNOVINT/DEADVERTIZE/NDATA/TSMATCH_INNOVINT_1
TOPIC_OBSERVATION=INNOVINT/OBSERVATION/NDATA/TSMATCH_INNOVINT_1
TOPIC_REQUEST=INNOVINT/HAMBURG_FACTORY1/REQUEST
TOPIC_DELETE_REQUEST=INNOVINT/HAMBURG_FACTORY1/DELETE_REQUEST
TOPIC_RESPONSE=INNOVINT/RESPONSE/NDATA/TSMATCH_INNOVINT_1
TOPIC_REQUEST_DISCOVERY_ALL=INNOVINT/HAMBURG_FACTORY1/REQUEST_DISCOVERY_ALL
TOPIC_RESPONSE_DISCOVERY_ALL=INNOVINT/DISCOVERY_ALL/NDATA/TSMATCH_INNOVINT_1
BROKER_URL_EXTERNAL=mqtt://ds-test.smecluster.com/rabbitmq:8883
BROKER_USERNAME_EXTERNAL=iai:user-inov
BROKER_PASSWORD_EXTERNAL=0UQaC?0UkaJ
NEO4J_HOST=10.0.33.39
NEO4J_PORT=7687
NEO4J_DATABASE=neo4j
NEO4J_USERNAME=neo4j
NEO4J_PASSWORD=iiotlabadmin
TOPIC_DISCOVERY=TOPIC_DISCOVERY
TOPIC_DEADVERTIZE=TOPIC_DEADVERTIZE
TOPIC_OBSERVATION=TOPIC_OBSERVATION
TOPIC_REQUEST=TOPIC_REQUEST
TOPIC_DELETE_REQUEST=TOPIC_DELETE_REQUEST
TOPIC_RESPONSE=TOPIC_RESPONSE
TOPIC_REQUEST_DISCOVERY_ALL=TOPIC_REQUEST_DISCOVERY_ALL
TOPIC_RESPONSE_DISCOVERY_ALL=TOPIC_RESPONSE_DISCOVERY_ALL
File moved
File moved
File moved
File moved
File moved
File moved
......@@ -8,39 +8,37 @@
import {Components, Configuration, Container, CoreTypes, mergeConfigurations, Runtime, User} from "@coaty/core";
import {NodeUtils} from "@coaty/core/runtime-node";
import {serviceConfig} from "../shared/config";
import {serviceConfigRaw} from "../shared/configRaw";
import {serviceConfig} from "../shared/config";
import {agentInfo} from "./agent.info";
import {RawController} from "./RawController";
import {SensorController} from "./SensorController";
import {TaskController} from "./TaskController";
import {BaseRepository} from "../shared/repository/BaseRepository";
import {EFPFController} from "./EFPFController";
import {TSMatchController} from "./TSMatchController";
const components: Components = {
const TSMatchComponents: Components = {
controllers: {
TaskController,
SensorController,
TSMatchController,
},
};
const componentsRaw: Components = {
const EFPFComponents: Components = {
controllers: {
RawController,
EFPFController,
},
};
const configurationRaw: Configuration = mergeConfigurations(
const EFPFConfiguration: Configuration = mergeConfigurations(
serviceConfigRaw(),
{
controllers: {
RawController: {
EFPFController: {
requestGenerationInterval: 10000,
},
},
});
const configuration: Configuration = mergeConfigurations(
const TSMatchConfiguration: Configuration = mergeConfigurations(
serviceConfig(agentInfo, "Service"),
{
common: {
......@@ -55,43 +53,29 @@ const configuration: Configuration = mergeConfigurations(
},
},
controllers: {
TaskController: {
requestGenerationInterval: 10000,
},
SensorController: {
TSMatchController: {
requestGenerationInterval: 10000,
},
},
});
BaseRepository.registerDBAdapter();
// First, initialize the database
BaseRepository.initializeDatabase(configuration.databases)
.then(() => {
// Then, create the Coaty container with the specified components and
// autostart the communication manager.
const container = Container.resolve(components, configuration);
// Log broker connection state changes (online/offline) to the console.
NodeUtils.logCommunicationState(container);
const containerRaw = Container.resolve(componentsRaw, configurationRaw);
NodeUtils.logCommunicationState(containerRaw);
const TSMatchContainer = Container.resolve(TSMatchComponents, TSMatchConfiguration);
const EFPFContainer = Container.resolve(EFPFComponents, EFPFConfiguration);
const rawController = containerRaw.getController<RawController>("RawController");
const taskController = container.getController<TaskController>("TaskController");
const sensorController = container.getController<SensorController>("SensorController");
NodeUtils.logCommunicationState(TSMatchContainer);
NodeUtils.logCommunicationState(EFPFContainer);
// register tsmatch api tasks
taskController.registerRaw(rawController.rawObservable);
taskController.unsubscribeRequestRaw(rawController.deleteRequestObservable);
sensorController.GetAllSensorsRaw(rawController.AllDiscoveredSensors);
rawController.registerResponse(taskController.responseObservable);
rawController.registerResults(taskController.resultsObservable);
rawController.registerSensorDiscovery(sensorController.sensorDiscoveryObservable);
rawController.registerSensorDeadvertize(sensorController.sensorDeadvertisedObservable);
rawController.registerAllSensorDiscovery(sensorController.allSensorDiscoveryObservable);
})
.catch(error => NodeUtils.logError(error, "Failed to initialize database."));
const tsmatchController = TSMatchContainer.getController<TSMatchController>("TSMatchController");
const efpfController = EFPFContainer.getController<EFPFController>("EFPFController");
// register tsmatch api tasks
tsmatchController.registerRaw(efpfController.rawObservable);
tsmatchController.unsubscribeRequestRaw(efpfController.deleteRequestObservable);
tsmatchController.GetAllSensorsRaw(efpfController.AllDiscoveredSensors);
efpfController.registerResponse(tsmatchController.responseObservable);
efpfController.registerResults(tsmatchController.resultsObservable);
efpfController.registerSensorDiscovery(tsmatchController.sensorDiscoveryObservable);
efpfController.registerSensorDeadvertise(tsmatchController.sensorDeadvertisedObservable);
efpfController.registerAllSensorDiscovery(tsmatchController.allSensorDiscoveryObservable);
/**
* Copyright (C) 2020 fortiss GmbH
* @author Nisrine Bnouhanna – {@link "bnouhanna@fortiss.org"}
* @author Erkan Karabulut – {@link "karabulut@fortiss.org"}
* @version 1.1
* an API that receives service request and publishes observation based on the specified request
* Establish communication between EFPF broker and TSMatch
* Uses coatyio Siemens AG. Licensed under the MIT License
*/
......@@ -14,7 +14,7 @@ import {Subject} from "rxjs";
/**
* Generate a service request based on user input then advertize it as object request
*/
export class RawController extends Controller {
export class EFPFController extends Controller {
private _rawSubject = new Subject();
private _deleteRequest = new Subject();
......@@ -94,11 +94,11 @@ export class RawController extends Controller {
* Subscribe to the sensor deadvertisement topic and advertise the messages
* @param rawSensorDeadvertise
*/
registerSensorDeadvertize(rawSensorDeadvertise) {
rawSensorDeadvertise.subscribe((sensorDeadvertize) => {
registerSensorDeadvertise(rawSensorDeadvertise) {
rawSensorDeadvertise.subscribe((sensorDeadvertise) => {
console.log("Sensor deadvertised");
this.communicationManager
.publishRaw(process.env.TOPIC_DEADVERTIZE, JSON.stringify(sensorDeadvertize));
.publishRaw(process.env.TOPIC_DEADVERTIZE, JSON.stringify(sensorDeadvertise));
});
}
......
/**
* Copyright (C) 2020 fortiss GmbH
* @author Nisrine Bnouhanna {@link "bnouhanna@fortiss.org"}
* @version 1.1
* an API that receives service request and publishes observation based on the specified request
* Uses coatyio Siemens AG. Licensed under the MIT License
*/
import {Controller} from "@coaty/core";
import {Subject} from "rxjs";
/**
* Generate a service request based on user input then advertize it as object request
*/
export class TSMatchController extends Controller {
serviceRequest;
private _response = new Subject();
private _results = new Subject();
private _sensorsSubscription = new Subject();
private _sensorsDeadvertise = new Subject();
private _allSensorsSubscription = new Subject();
onInit() {
super.onInit();
}
onCommunicationManagerStarting() {
super.onCommunicationManagerStarting();
this.communicationManager
.observeRaw(process.env.TOPIC_DISCOVERY)
.subscribe(discoveredSensor => {
this._sensorsSubscription.next(JSON.parse(discoveredSensor[1].toString()));
});
this.communicationManager
.observeRaw(process.env.TOPIC_DEADVERTIZE)
.subscribe(deadvertisedSensor => {
this._sensorsDeadvertise.next(JSON.parse(deadvertisedSensor[1].toString()));
});
this.communicationManager
.observeRaw(process.env.TOPIC_OBSERVATION)
.subscribe(observation => {
this._results.next(JSON.parse(observation[1].toString()));
});
}
registerRaw(rawObs) {
rawObs.subscribe(([topic, payload]) => {
this.communicationManager
.publishRaw(process.env.TOPIC_REQUEST, payload.toString());
});
this.communicationManager
.observeRaw(process.env.TOPIC_RESPONSE)
.subscribe(response => {
this._response.next(JSON.parse(response[1].toString()));
});
}
unsubscribeRequestRaw(rawUnsubscribeRaw) {
rawUnsubscribeRaw.subscribe(([topic, payload]) => {
this.communicationManager
.publishRaw(process.env.TOPIC_DELETE_REQUEST, JSON.stringify(payload));
});
}
get responseObservable() {
return this._response.asObservable();
}
get resultsObservable() {
return this._results.asObservable();
}
GetAllSensorsRaw(rawAllSensors) {
rawAllSensors.subscribe(([topic, payload]) => {
this.communicationManager
.publishRaw(process.env.TOPIC_REQUEST_DISCOVERY_ALL, JSON.stringify(payload));
});
this.communicationManager
.observeRaw(process.env.TOPIC_RESPONSE_DISCOVERY_ALL)
.subscribe(data => {
this._allSensorsSubscription.next(JSON.parse(data[1].toString()))
});
}
get sensorDeadvertisedObservable() {
return this._sensorsDeadvertise.asObservable();
}
get sensorDiscoveryObservable() {
return this._sensorsSubscription.asObservable();
}
get allSensorDiscoveryObservable() {
return this._allSensorsSubscription.asObservable();
}
}
......@@ -6,8 +6,8 @@
* Uses coatyio Siemens AG. Licensed under the MIT License
*/
import { AgentInfo, Configuration} from "@coaty/core";
import { NodeUtils } from "@coaty/core/runtime-node";
import {AgentInfo, Configuration} from "@coaty/core";
import {NodeUtils} from "@coaty/core/runtime-node";
NodeUtils.logInfo(`BROKER_URL=${process.env.BROKER_URL}`);
......@@ -26,27 +26,16 @@ export function serviceConfig(agentInfo: AgentInfo, agentName: string): Configur
return {
common: {
agentInfo,
agentIdentity: { name: agentName },
agentIdentity: {name: agentName},
},
communication: {
brokerUrl: process.env.BROKER_URL,
mqttClientOptions : {
mqttClientOptions: {
username: process.env.BROKER_USERNAME,
password: process.env.BROKER_PASSWORD,
},
namespace: "coaty.demonstrator2",
shouldAutoStart: true,
},
databases: {
neo4j: {
adapter: "Neo4jAdapter",
connectionString: `bolt://${process.env.NEO4J_HOST}:${process.env.NEO4J_PORT}`,
connectionOptions: {
NEO4J_USERNAME: process.env.NEO4J_USERNAME,
NEO4J_PASSWORD: process.env.NEO4J_PASSWORD,
NEO4J_DATABASE: process.env.NEO4J_DATABASE,
}
}
},
}
};
}
......@@ -6,14 +6,14 @@
* Uses coatyio Siemens AG. Licensed under the MIT License
*/
import { Configuration} from "@coaty/core";
import { NodeUtils } from "@coaty/core/runtime-node";
// import * as fs from "fs";
// const brokerURL = "mqtts://dataspine.efpf.linksmart.eu:8883";
// const brokerURL = "mqtt://broker.hivemq.com:1883";
// import * as path from "path";
import {Configuration} from "@coaty/core";
import {NodeUtils} from "@coaty/core/runtime-node";
import * as fs from "fs";
import * as path from "path";
NodeUtils.logInfo(`BROKER_URL=${process.env.BROKER_URL_EXTERNAL}`);
// const CERT = fs.readFileSync(path.resolve("./src/shared/cacert-2020-07-22.pem"));
const CERT = fs.readFileSync(path.resolve("./src/shared/cacert-2021-10-26.pem"));
if (!process.env.BROKER_URL_EXTERNAL) {
NodeUtils.logError(new Error("Missing Broker URL"), "Environment variable BROKER_URL not specified.");
......@@ -29,14 +29,14 @@ export function serviceConfigRaw(): Configuration {
return {
communication: {
brokerUrl: process.env.BROKER_URL_EXTERNAL,
mqttClientOptions : {
mqttClientOptions: {
username: process.env.BROKER_USERNAME_EXTERNAL,
password: process.env.BROKER_PASSWORD_EXTERNAL,
// username: "for:bnouhanna@fortiss.org",
// password: "ARFiKh71xR5nTSQSPMnSS9BB",
// rejectUnauthorized: false,
// cert: CERT,
// protocol: "mqtts",
rejectUnauthorized: false,
cert: CERT,
protocol: "mqtts",
},
shouldAutoStart: true,
},
......
File moved
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment