145 lines
3.2 KiB
Go
145 lines
3.2 KiB
Go
package backend
|
|
|
|
import (
|
|
"bufio"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"os/signal"
|
|
"syscall"
|
|
"time"
|
|
|
|
MQTT "github.com/eclipse/paho.mqtt.golang"
|
|
"github.com/urfave/cli"
|
|
)
|
|
|
|
func createUrl(url string, port int) string {
|
|
return fmt.Sprintf("tcp://%s:%d", url, port)
|
|
}
|
|
|
|
type message struct {
|
|
Time time.Time `json:string`
|
|
Topic string `json:string`
|
|
Payload string `json:string`
|
|
}
|
|
|
|
func newRecoder(r *os.File, exitSignalCh chan os.Signal) chan MQTT.Message {
|
|
c := make(chan MQTT.Message)
|
|
go func() {
|
|
w := bufio.NewWriter(r)
|
|
for {
|
|
select {
|
|
case toSave := <-c:
|
|
{
|
|
now := time.Now()
|
|
m := message{now,
|
|
toSave.Topic(),
|
|
string(toSave.Payload())}
|
|
mStr, err := json.Marshal(m)
|
|
if err != nil {
|
|
log.Print("Problems with a message: topic %s, payload: %s", toSave.Topic(), toSave.Payload())
|
|
} else {
|
|
fmt.Fprintf(w, "%+v\n", string(mStr))
|
|
}
|
|
}
|
|
|
|
case <-exitSignalCh:
|
|
{
|
|
fmt.Println("Flushing to disk")
|
|
w.Flush()
|
|
r.Close()
|
|
os.Exit(1)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
return c
|
|
}
|
|
|
|
func StartRecording(c *cli.Context) {
|
|
log.SetOutput(os.Stderr)
|
|
log.Print("START RECORDING")
|
|
url := createUrl(c.GlobalString("url"), c.GlobalInt("port"))
|
|
log.Print(" url=" + url)
|
|
file, err := os.Create(c.GlobalString("record"))
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
topic := c.GlobalString("topic")
|
|
log.Print(" topic=" + topic)
|
|
|
|
opts := MQTT.NewClientOptions()
|
|
opts.AddBroker(url)
|
|
log.Print(" user=" + c.GlobalString("user"))
|
|
log.Print(" password=" + c.GlobalString("password"))
|
|
opts.SetUsername(c.GlobalString("user"))
|
|
opts.SetPassword(c.GlobalString("password"))
|
|
|
|
receiver := MQTT.NewClient(opts)
|
|
if token := receiver.Connect(); token.Wait() && token.Error() != nil {
|
|
log.Fatal(token.Error())
|
|
}
|
|
exitSignalCh := make(chan os.Signal)
|
|
signal.Notify(exitSignalCh, os.Interrupt)
|
|
signal.Notify(exitSignalCh, syscall.SIGTERM)
|
|
|
|
recoderCh := newRecoder(file, exitSignalCh)
|
|
f := func(receiver MQTT.Client, msg MQTT.Message) {
|
|
recoderCh <- msg
|
|
}
|
|
|
|
if token := receiver.Subscribe(topic, 1, f); token.Wait() && token.Error() != nil {
|
|
log.Fatal(token.Error())
|
|
}
|
|
|
|
for {
|
|
}
|
|
|
|
}
|
|
|
|
func PlayBack(c *cli.Context) {
|
|
url := createUrl(c.GlobalString("url"), c.GlobalInt("port"))
|
|
|
|
opts := MQTT.NewClientOptions()
|
|
opts.AddBroker(url)
|
|
|
|
sender := MQTT.NewClient(opts)
|
|
if token := sender.Connect(); token.Wait() && token.Error() != nil {
|
|
log.Fatal(token.Error())
|
|
}
|
|
playLoop := c.Bool("loop")
|
|
loopCount := 0
|
|
for playLoop || loopCount == 0 {
|
|
loopCount++
|
|
file, err := os.Open(c.GlobalString("record"))
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
reader := bufio.NewScanner(file)
|
|
fastForward := c.Bool("ff")
|
|
var message message
|
|
var previousTime time.Time
|
|
firstRound := true
|
|
for reader.Scan() {
|
|
json.Unmarshal([]byte(reader.Text()), &message)
|
|
if !fastForward {
|
|
if !firstRound {
|
|
toWait := message.Time.Sub(previousTime)
|
|
previousTime = message.Time
|
|
time.Sleep(toWait)
|
|
} else {
|
|
firstRound = false
|
|
previousTime = message.Time
|
|
}
|
|
}
|
|
fmt.Println(message.Payload)
|
|
if token := sender.Publish(message.Topic, 1, false, message.Payload); token.Wait() && token.Error() != nil {
|
|
fmt.Println(token.Error())
|
|
}
|
|
}
|
|
}
|
|
|
|
}
|