mqtt-stereo/backend/backend.go
2019-02-27 15:43:47 +01:00

145 lines
3.2 KiB
Go

package backend
import (
"bufio"
"encoding/json"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
MQTT "github.com/eclipse/paho.mqtt.golang"
"github.com/urfave/cli"
)
func createUrl(url string, port int) string {
return fmt.Sprintf("tcp://%s:%d", url, port)
}
type message struct {
Time time.Time `json:string`
Topic string `json:string`
Payload string `json:string`
}
func newRecoder(r *os.File, exitSignalCh chan os.Signal) chan MQTT.Message {
c := make(chan MQTT.Message)
go func() {
w := bufio.NewWriter(r)
for {
select {
case toSave := <-c:
{
now := time.Now()
m := message{now,
toSave.Topic(),
string(toSave.Payload())}
mStr, err := json.Marshal(m)
if err != nil {
log.Print("Problems with a message: topic %s, payload: %s", toSave.Topic(), toSave.Payload())
} else {
fmt.Fprintf(w, "%+v\n", string(mStr))
}
}
case <-exitSignalCh:
{
fmt.Println("Flushing to disk")
w.Flush()
r.Close()
os.Exit(1)
return
}
}
}
}()
return c
}
func StartRecording(c *cli.Context) {
log.SetOutput(os.Stderr)
log.Print("START RECORDING")
url := createUrl(c.GlobalString("url"), c.GlobalInt("port"))
log.Print(" url=" + url)
file, err := os.Create(c.GlobalString("record"))
if err != nil {
log.Fatal(err)
}
topic := c.GlobalString("topic")
log.Print(" topic=" + topic)
opts := MQTT.NewClientOptions()
opts.AddBroker(url)
log.Print(" user=" + c.GlobalString("user"))
log.Print(" password=" + c.GlobalString("password"))
opts.SetUsername(c.GlobalString("user"))
opts.SetPassword(c.GlobalString("password"))
receiver := MQTT.NewClient(opts)
if token := receiver.Connect(); token.Wait() && token.Error() != nil {
log.Fatal(token.Error())
}
exitSignalCh := make(chan os.Signal)
signal.Notify(exitSignalCh, os.Interrupt)
signal.Notify(exitSignalCh, syscall.SIGTERM)
recoderCh := newRecoder(file, exitSignalCh)
f := func(receiver MQTT.Client, msg MQTT.Message) {
recoderCh <- msg
}
if token := receiver.Subscribe(topic, 1, f); token.Wait() && token.Error() != nil {
log.Fatal(token.Error())
}
for {
}
}
func PlayBack(c *cli.Context) {
url := createUrl(c.GlobalString("url"), c.GlobalInt("port"))
opts := MQTT.NewClientOptions()
opts.AddBroker(url)
sender := MQTT.NewClient(opts)
if token := sender.Connect(); token.Wait() && token.Error() != nil {
log.Fatal(token.Error())
}
playLoop := c.Bool("loop")
loopCount := 0
for playLoop || loopCount == 0 {
loopCount++
file, err := os.Open(c.GlobalString("record"))
if err != nil {
log.Fatal(err)
}
reader := bufio.NewScanner(file)
fastForward := c.Bool("ff")
var message message
var previousTime time.Time
firstRound := true
for reader.Scan() {
json.Unmarshal([]byte(reader.Text()), &message)
if !fastForward {
if !firstRound {
toWait := message.Time.Sub(previousTime)
previousTime = message.Time
time.Sleep(toWait)
} else {
firstRound = false
previousTime = message.Time
}
}
fmt.Println(message.Payload)
if token := sender.Publish(message.Topic, 1, false, message.Payload); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
}
}
}
}