This commit is contained in:
siscia 2016-08-05 20:48:46 +02:00
commit 7e7334abe3
2 changed files with 209 additions and 0 deletions

136
backend/backend.go Normal file
View File

@ -0,0 +1,136 @@
package backend
import (
"bufio"
"encoding/json"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
MQTT "github.com/eclipse/paho.mqtt.golang"
"github.com/urfave/cli"
)
func createUrl(url string, port int) string {
return fmt.Sprintf("tcp://%s:%d", url, port)
}
type message struct {
Time time.Time `json:string`
Topic string `json:string`
Payload string `json:string`
}
func newRecoder(r *os.File, exitSignalCh chan os.Signal) chan MQTT.Message {
c := make(chan MQTT.Message)
go func() {
w := bufio.NewWriter(r)
for {
select {
case toSave := <-c:
{
now := time.Now()
m := message{now,
toSave.Topic(),
string(toSave.Payload())}
mStr, err := json.Marshal(m)
if err != nil {
log.Print("Problems with a message: topic %s, payload: %s", toSave.Topic(), toSave.Payload())
} else {
fmt.Fprintf(w, "%+v\n", string(mStr))
}
}
case <-exitSignalCh:
{
fmt.Println("Flushing to disk")
w.Flush()
r.Close()
os.Exit(1)
return
}
}
}
}()
return c
}
func StartRecording(c *cli.Context) {
url := createUrl(c.GlobalString("url"), c.GlobalInt("port"))
file, err := os.Create(c.GlobalString("record"))
if err != nil {
log.Fatal(err)
}
topic := c.GlobalString("topic")
opts := MQTT.NewClientOptions()
opts.AddBroker(url)
receiver := MQTT.NewClient(opts)
if token := receiver.Connect(); token.Wait() && token.Error() != nil {
log.Fatal(token.Error())
}
exitSignalCh := make(chan os.Signal)
signal.Notify(exitSignalCh, os.Interrupt)
signal.Notify(exitSignalCh, syscall.SIGTERM)
recoderCh := newRecoder(file, exitSignalCh)
f := func(receiver MQTT.Client, msg MQTT.Message) {
recoderCh <- msg
}
if token := receiver.Subscribe(topic, 1, f); token.Wait() && token.Error() != nil {
log.Fatal(token.Error())
}
for {
}
}
func PlayBack(c *cli.Context) {
url := createUrl(c.GlobalString("url"), c.GlobalInt("port"))
opts := MQTT.NewClientOptions()
opts.AddBroker(url)
sender := MQTT.NewClient(opts)
if token := sender.Connect(); token.Wait() && token.Error() != nil {
log.Fatal(token.Error())
}
playLoop := c.Bool("loop")
loopCount := 0
for playLoop || loopCount == 0 {
loopCount++
file, err := os.Open(c.GlobalString("record"))
if err != nil {
log.Fatal(err)
}
reader := bufio.NewScanner(file)
fastForward := c.Bool("ff")
var message message
var previousTime time.Time
firstRound := true
for reader.Scan() {
json.Unmarshal([]byte(reader.Text()), &message)
if !fastForward {
if !firstRound {
toWait := message.Time.Sub(previousTime)
previousTime = message.Time
time.Sleep(toWait)
} else {
firstRound = false
previousTime = message.Time
}
}
fmt.Println(message.Payload)
if token := sender.Publish(message.Topic, 1, false, message.Payload); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
}
}
}
}

73
main.go Normal file
View File

@ -0,0 +1,73 @@
package main
import (
"fmt"
"os"
"github.com/urfave/cli"
"github.com/siscia/mqtt-player/backend"
)
func main() {
app := cli.NewApp()
app.Name = "mqtt-player"
app.Flags = []cli.Flag{
cli.StringFlag{
Name: "record",
Usage: "What file use to record and playback",
Value: "mqtt-record.txt",
},
cli.StringFlag{
Name: "topic",
Usage: "What topic listen and what topic play back",
Value: "/#",
},
cli.StringFlag{
Name: "url",
Usage: "Where to listen to the MQTT broker",
Value: "localhost",
},
cli.IntFlag{
Name: "port",
Usage: "What port to use to connect to the broker",
Value: 1883,
},
}
app.Commands = []cli.Command{
{
Name: "record",
Aliases: []string{"r", "rec"},
Usage: "Record traffic from a MQTT broker",
Action: func(c *cli.Context) error {
fmt.Println(c.GlobalString("url"))
backend.StartRecording(c)
return nil
},
},
{
Name: "play",
Aliases: []string{"p"},
Usage: "Play back previous registered traffic to a MQTT broker",
Action: func(c *cli.Context) error {
fmt.Println("Playing back the recorded traffic from: ", c.GlobalString("record"))
backend.PlayBack(c)
return nil
},
Flags: []cli.Flag{
cli.BoolFlag{
Name: "ff",
Usage: "Fast forward, if true plays all the messages in the file without respecting the times differences"},
cli.BoolFlag{
Name: "loop",
Usage: "Loop the player and keep playing all the messages, in order, indefinitely",
},
},
},
}
app.Run(os.Args)
}