SO, I have this code on multiple raspi cameras:
const express = require('express');
const WebSocket = require('ws');
const raspividStream = require('raspivid-stream');
const fs = require('fs');
const readline = require('readline');
const app = express();
const wss = require('express-ws')(app);
const { exec } = require('child_process');
const path = require('path');
var vstreamCounter = 0;
var videoStream = null;
var recording = false;
var websoc = null;
var currentDir = __dirname;
var camfn = 'camera.conf';
var p = path.join(currentDir, camfn);
if (fs.existsSync(p)) {
const fileStream = fs.createReadStream(p);
fileStream.on('error', (error) => {
console.error(`Error reading file: ${error.message}`);
});
const rl = readline.createInterface({
input: fileStream,
crlfDelay: Infinity
});
rl.on('line', (line) => {
console.log(`Got ip from file: ${line}`);
var serverUrl = 'ws://'+line+':1337';
console.log(serverUrl);
websoc = new WebSocket(serverUrl);
websoc.on('open', function open() {
console.log('Connected to the server');
});
});
rl.on('close', () => {
console.log('File reading finished');
});
} else {
console.log("missing configuration file camera.conf");
process.exit();
}
app.get('/camera-status', (req, res) => {
exec('vcgencmd get_camera', (error, stdout, stderr) => {
if (error) {
console.error(`Error: ${error.message}`);
return;
}
if (stderr) {
console.error(`stderr: ${stderr}`);
return;
}
console.log(`stdout: ${stdout}`);
var position = stdout.indexOf(",");
if (position != -1) {
var sbstr = stdout.substring(0, position);
var rgxp = new RegExp("1", 'g');
var occurences = sbstr.match(rgxp);
if (occurences.length == 2){
res.send(200);
} else {
res.send(500);
}
}
});
});
app.ws('/vstream', async (ws, req) => {
console.log('Client connected');
vstreamCounter = vstreamCounter + 1;
ws.send(JSON.stringify({
action: "init",
width: 640,
height: 480
}));
videoStream = await raspividStream();
videoStream.on('data', (data) => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(data, { binary: true }, (error) => { if (error) {console.error(error)/* process.exit()*/;} });
}
});
ws.on("message", async (msg) => {
await messageHandling(msg, null);
});
if (ws.readyState === WebSocket.OPEN) {
ws.on('close', async () => {
await closing(videoStream);
});
}
});
app.ws('/vstream-90', async (ws, req) => {
console.log('Client connected');
vstreamCounter = vstreamCounter + 1;
ws.send(JSON.stringify({
action: "init",
width: 640,
height: 480,
rotation: 90
}));
videoStream = await raspividStream({ rotation: 90 });
videoStream.on('data', (data) => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(data, { binary: true }, (error) => { if (error) {console.error(error)/* process.exit()*/;} });
}
});
ws.on("message", async (msg) => {
await messageHandling(msg, 90);
});
if (ws.readyState === WebSocket.OPEN) {
ws.on('close', async () => {
await closing(videoStream);
});
}
});
app.ws('/vstream-180', async (ws, req) => {
console.log('Client connected');
vstreamCounter = vstreamCounter + 1;
ws.send(JSON.stringify({
action: "init",
width: 640,
height: 480,
rotation: 180
}));
videoStream = await raspividStream({ rotation: 180 });
videoStream.on('data', async (data) => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(data, { binary: true }, (error) => { if (error) {console.error(error)/* process.exit()*/;} });
}
});
ws.on("message", async (msg) => {
await messageHandling(msg, 180);
});
if (ws.readyState === WebSocket.OPEN) {
ws.on('close', async () => {
await closing(videoStream);
});}
});
var recording = true;
var ss = null;
var recordLock = 0;
async function messageHandling(msg, rot){
if (msg.toString().includes("loc-")){
websoc.send(msg);
}
if (msg.toString().includes("desc-")){
websoc.send(msg);
}
if (msg.toString() === "record"){
//if (recordLock == 0) {
recordLock = recordLock + 1;
if (rot){
ss = await raspividStream({ rotation: rot });
} else {
ss = await raspividStream();
}
// if (recording){
ss.on('data', async (d) => {
if (websoc.readyState === WebSocket.OPEN) {
websoc.send(d, { binary: true }, (error) => { if (error) {console.error(error)/* process.exit()*/;} });
}
});
//}
//}
}
if (msg.toString() === "stoprecord"){
// if (recording) {
recording = false;
websoc.send("stoprecord");
console.log("stopped recording");
//process.exit();
// }
}
}
async function closing(){
console.log('Client left');
videoStream.removeAllListeners('data');
vstreamCounter = vstreamCounter - 1;
//await burnThemInClose(filename, location);
console.log(vstreamCounter);
if (vstreamCounter === 0){
process.exit();
}
}
app.listen(8080, function(err){
if (err) console.log("Error in server setup")
console.log("Server listening on Port");
});
This is server side code that cameras connect to for processing the stream:
const WebSocket = require('ws')
const fs = require('fs');
const ffmpeg = require('fluent-ffmpeg');
const { finished } = require('stream');
const { promisify } = require('util');
const finishedAsync = promisify(finished);
const writeFileAsync1 = promisify((stream, data, callback) => {
stream.write(data, callback);
});
const server = new WebSocket.Server({ port: 1337 });
let location = null;
server.on('connection', (ws) => {
console.log('Client connected');
let writeStream = null;
let filename = null;
let view = null;
let camcnt = 0;
ws.on('message', async (message) => {
// console.log("in message handler");
const msgStr = message.toString();
if (msgStr.includes("desc-")) {
view = msgStr.slice(5);
console.log(`View set to: ${view}`);
}
if (msgStr.includes("loc-")) {
location = msgStr.slice(4);
console.log(`Location set to: ${location}`);
}
if (msgStr.includes("cam") && camcnt === 0) {
console.log(`Camera message received: ${msgStr}`);
if (!location || !view) {
console.error('Location or view not set. Cannot create filename.');
return;
}
filename = generateFileName(location, view);
writeStream = fs.createWriteStream(filename);
console.log(`Write stream created: ${filename}`);
camcnt++;
}
if (writeStream && !msgStr.startsWith("desc-") && !msgStr.startsWith("loc-") && !msgStr.includes("cam1") && !msgStr.includes("cam2")) {
if (!writeStream.writableEnded) {
try {
await writeFileAsync1(writeStream, message);
console.log('Data written to file');
} catch (error) {
console.error('Error writing to stream:', error);
}
}
}
if (msgStr === "stoprecord") {
if (writeStream) {
if (!writeStream.writableEnded) {
writeStream.end();
try {
await finishedAsync(writeStream);
console.log('Write stream ended');
} catch (error) {
console.error('Error ending the stream:', error);
}
}
}
// ws.close();
console.log('WebSocket connection closed');
}
});
ws.on('close', async () => {
console.log('Client disconnected');
if (writeStream && !writeStream.writableEnded) {
writeStream.end();
try {
await finishedAsync(writeStream);
console.log('Write stream ended');
} catch (error) {
console.error('Error ending the stream:', error);
}
}
});
});
function generateFileName(location, view) {
var d = new Date();
var filename = location +"_"+view+"_"+
d.getFullYear() + "-" +
("00" + (d.getMonth() + 1)).slice(-2) + "-" +
("00" + d.getDate()).slice(-2) + "_" +
("00" + d.getHours()).slice(-2) + ":" +
("00" + d.getMinutes()).slice(-2) + ":" +
("00" + d.getSeconds()).slice(-2) + ".h264";
filename = filename.replace(/["']/g, "");
filename = filename.replace(/:/g, "_");
return filename;
}
async function reencodeVideo(inputPath, outputPath) {
return new Promise((resolve, reject) => {
ffmpeg(inputPath)
.videoCodec('libx264')
.audioCodec('aac')
.outputOptions('-movflags', 'faststart') // Ensures compatibility with streaming
.on('end', () => {
console.log('Video re-encoding complete.');
resolve();
})
.on('error', (err) => {
console.error('Error:', err);
reject(err);
})
.save(outputPath);
});
}
function getVideoMetadata(videofilename) {
return new Promise((resolve, reject) => {
ffmpeg.ffprobe(videofilename, (err, metadata) => {
if (err) {
reject(err);
} else {
resolve(metadata);
}
});
});
}
function generateSrtContent(duration, staticText, startTime) {
// Convert start time to seconds
const [hours, minutes, seconds] = startTime.split(':').map(Number);
const startTimeSeconds = hours * 3600 + minutes * 60 + seconds;
let srtContent = '';
for (let i = 0; i < duration; i++) {
const currentTime = startTimeSeconds + i;
const start = new Date(i * 1000).toISOString().substr(11, 8) + ',000';
const end = new Date((i + 1) * 1000).toISOString().substr(11, 8) + ',000';
const timeText = new Date(currentTime * 1000).toISOString().substr(11, 8);
srtContent += `${i + 1}\n`;
srtContent += `${start} --> ${end}\n`;
srtContent += `${staticText} ${timeText}\n\n`;
}
return srtContent;
}
function writeFileAsync(filePath, content) {
return new Promise((resolve, reject) => {
fs.writeFile(filePath, content, (err) => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
}
function encodeVideoWithSubtitles(inputVideo, subtitleFile, outputVideo) {
return new Promise((resolve, reject) => {
ffmpeg(inputVideo)
.outputOptions('-vf', `subtitles=${subtitleFile}`)
.output(outputVideo)
.on('end', () => {
console.log('Video processing complete.');
resolve();
})
.on('error', (err) => {
console.error('Error:', err);
reject(err);
})
.run();
});
}
var staticcnt = 0;
async function burnThemInClose(filename, location){
try {
var d = new Date();
var hhmmss = ("00" + d.getHours()).slice(-2) + ":" +
("00" + d.getMinutes()).slice(-2) + ":" +
("00" + d.getSeconds()).slice(-2);
var staticTime = d.getFullYear() + "-" +
("00" + (d.getMonth() + 1)).slice(-2) + "-" +
("00" + d.getDate()).slice(-2);
var newMP4 = filename.slice(0, -5)+".mp4"
await reencodeVideo(filename, "static"+staticcnt+".mp4");
const metadata = await getVideoMetadata("static"+staticcnt+".mp4");
staticnt = staticcnt + 1;
console.log(metadata);
const duration = Math.floor(metadata.format.duration);
const outputSrt = "titulky.srt";
const srtContent = generateSrtContent(duration, location+" "+staticTime, hhmmss);
if (fs.existsSync(outputSrt)) {
fs.unlinkSync(outputSrt);
}
await writeFileAsync(outputSrt, srtContent);
// await encodeVideoWithSubtitles("static.mp4", outputSrt, newMP4);
if (fs.existsSync("static.mp4")) {
fs.unlinkSync("static.mp4");
}
if (fs.existsSync(filename)) {
// fs.unlinkSync(filename);
}
if (fs.existsSync(outputSrt)) {
fs.unlinkSync(outputSrt);
}
console.log('encoding finished');
} catch (err) {
console.error('Error:', err);
}
}
and this is angluar component for initializing and changing the streams:
import { AfterViewInit, Component, ElementRef, Input, OnInit, ViewChild } from '@angular/core';
import { RobotComponent } from '@shared/device-interfaces';
import { MatSnackBar } from '@angular/material/snack-bar';
import { WSAvcPlayer } from 'ts-h264-live-player';
import { ServerApiService } from '../../core/services/server-api.service';
import { from } from 'rxjs';
import { Router, ActivatedRoute } from '@angular/router';
const url = require('url');
@Component({
selector: 'app-h264-camera-streams',
templateUrl: './h264-camera-streamsponent.html',
styleUrls: ['./h264-camera-streamsponent.css'],
})
export class H264CameraStreamsComponent implements OnInit, AfterViewInit {
@Input() cameras: RobotComponent[];
@ViewChild('canvasOne') canvasOneElementRef: ElementRef;
@ViewChild('canvasTwo') canvasTwoElementRef: ElementRef;
playerOne: WSAvcPlayer;
playerTwo: WSAvcPlayer;
canvasOneElement: HTMLCanvasElement;
canvasTwoElement: HTMLCanvasElement;
useSplitScreen = false;
selectedCamera: RobotComponent;
isFormSaved: boolean;
isRecording: boolean;
isButtonEnabled: boolean;
postLocation: string;
selectedOption: any;
camerasBackgroud = [
'../../../assets/images/camera_1.png',
'../../../assets/images/camera_2.png',
'../../../assets/images/camera_3.png',
'../../../assets/images/camera_4.png',
];
city: string;
street: string;
data: any;
isLoading: any;
intervalId: any;
constructor(public snackBar: MatSnackBar, private api: ServerApiService, private router: Router, private activatedRoute: ActivatedRoute) {}
getLocations() {
let result = this.api.getLocations();
const observableFromPromise = from(result);
observableFromPromise.subscribe(
(response) => {
this.data = response;
this.isLoading = false;
},
(error) => {
console.error('Error fetching data', error);
this.isLoading = false;
}
);
}
toUrlEncoded(obj) {
return Object.keys(obj)
.map(key => encodeURIComponent(key) + '=' + encodeURIComponent(obj[key]))
.join('&');
}
async onOptionSelect(event: Event) {
const selectElement = event.target as HTMLSelectElement;
this.selectedOption = selectElement.value;
let data = {
location: this.selectedOption
};
let enc = this.toUrlEncoded(data);
console.log(this.selectedOption);
this.playerOne.send("loc-"+this.selectedOption);
this.playerTwo.send("loc-"+this.selectedOption);
this.api.postLocations(this.postLocation, enc);
this.isButtonEnabled = true;
}
saveForm(form: any) {
this.isFormSaved = this.city.trim().length > 0 && this.street.trim().length > 0;
if (this.isFormSaved){
let location: any = {"city":this.city,"street":this.street};
this.api.addLocation(location);
// window.location.reload();
} else {
console.log('Form is invalid');
}
};
ngOnInit(): void {
this.selectedCamera = this.cameras[0];
const parsedUrl = new URL(this.selectedCamera.camera_stream);
const hostname = parsedUrl.hostname;
const port = parsedUrl.port;
this.postLocation = "http://" + hostname + ":" + "8080" + "/post-location";
this.getLocations();
}
async ngAfterViewInit() {
if (this.cameras.length <= 0) return;
if(this.selectedCamera.split_screen){
this.useSplitScreen = true;
this.canvasOneElement = this.canvasOneElementRef.nativeElement;
this.canvasTwoElement = this.canvasTwoElementRef.nativeElement;
this.playerOne = new WSAvcPlayer(this.canvasOneElement);
this.playerTwo = new WSAvcPlayer(this.canvasTwoElement);
this.playerOne.connectByUrl(this.selectedCamera.camera_stream);
this.playerTwo.connectByUrl(this.selectedCamera.camera_stream_two);
//await this.delay(500);
//this.playerOne.send('REQUESTSTREAM');
//this.playerTwo.send('REQUESTSTREAM');
} else {
this.useSplitScreen = false;
this.canvasOneElement = this.canvasOneElementRef.nativeElement;
this.playerOne = new WSAvcPlayer(this.canvasOneElement);
this.playerOne.connectByUrl(this.selectedCamera.camera_stream);
await this.delay(500);
this.playerOne.send('REQUESTSTREAM');
}
}
getColor(camera: RobotComponent) {
console.log(camera)
console.log(this.selectedCamera)
if (camera.id === this.selectedCamera.id) return "primary"
else return "secondary"
}
delay(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
async recordStream() {
// if (this.useSplitScreen) {
this.playerOne.send("desc-"+this.selectedCamera.description+"_cam1");
this.playerOne.send("record");
await this.delay(500);
this.playerTwo.send("desc-"+this.selectedCamera.description+"_cam2");
this.playerTwo.send("record");
//} else {
//this.playerOne.send('record');
//}
this.isRecording = true;
}
async stopRecordStream() {
if (this.useSplitScreen) {
this.playerOne.send("stoprecord");
this.playerTwo.send("stoprecord");
} else {
this.playerOne.send('stoprecord');
}
this.isRecording = false;
}
async changeStream(camera: RobotComponent) {
this.selectedCamera = camera;
console.log(this.selectedCamera);
if (camera.split_screen){
this.playerOne.send("stoprecord");
this.playerTwo.send("stoprecord");
this.clearCanvas();
this.canvasTwoElement.hidden = false;
this.playerOne.disconnect();
this.playerTwo.disconnect();
this.playerOne.removeAllListeners();
this.playerTwo.removeAllListeners();
let ws = new WebSocket(this.selectedCamera.camera_stream);
let ws2 = new WebSocket(this.selectedCamera.camera_stream_two);
this.playerOne.connectWithCustomClient(ws);
this.playerTwo.connectWithCustomClient(ws2);
await this.delay(500);
this.playerOne.send("desc-"+camera.description+"_cam1");
await this.delay(1500);
this.playerOne.send("record");
await this.delay(500);
this.playerTwo.send("desc-"+camera.description+"_cam1");
await this.delay(1500);
this.playerTwo.send("record");
//this.playerOne.connectByUrl(camera.camera_stream);
//this.playerTwo.connectByUrl(camera.camera_stream_two);
console.log(this.playerOne);
console.log(this.playerTwo);
this.useSplitScreen = true;
} else {
// this.stopStream();
this.clearCanvas()
this.playerOne.disconnect;
this.playerOne.removeAllListeners;
this.playerOne.connectByUrl(camera.camera_stream);
await this.delay(500);
//this.playerOne.send('REQUESTSTREAM');
this.playerOne.send("desc-"+camera.description+"_cam1");
await this.delay(1500);
this.playerOne.send("record");
this.useSplitScreen = false;
}
}
clearCanvas() {
if (this.useSplitScreen) {
var contextOne = this.canvasOneElement.getContext('2d');
var contextTwo = this.canvasTwoElement.getContext('2d');
contextOne.clearRect(0, 0, this.canvasOneElement.width, this.canvasOneElement.height);
contextTwo.clearRect(0, 0, this.canvasTwoElement.width, this.canvasTwoElement.height);
this.canvasTwoElement.hidden = true;
} else {
var contextOne = this.canvasOneElement.getContext('2d');
contextOne.clearRect(0, 0, this.canvasOneElement.width, this.canvasOneElement.height);
}
}
refreshStream() {
this.stopStream();
this.delay(150);
this.startStream();
}
startStream() {
if (this.useSplitScreen) {
this.playerOne.send('REQUESTSTREAM');
this.playerTwo.send('REQUESTSTREAM');
} else {
this.playerOne.send('REQUESTSTREAM');
}
}
stopStream() {
if (this.useSplitScreen) {
this.playerOne.stopStream();
this.playerTwo.stopStream();
} else {
this.playerOne.stopStream();
}
}
}
I would like to start new write stream on processing serverside, when the angular changeStream function is called, with new view - with two cameras. Problem is that first loaded view is recorded to two separate files which is correct, but when i call change stream in angular, first two streams are ended which is correct but second view write streams are not created on serverside despite that "record" message is sent to the new newly created websocket stream.
Can any one help out please.