commit 7e7334abe3e37f44fff241043192e15f4004bffc Author: siscia Date: Fri Aug 5 20:48:46 2016 +0200 add code diff --git a/backend/backend.go b/backend/backend.go new file mode 100644 index 0000000..0ceb2b7 --- /dev/null +++ b/backend/backend.go @@ -0,0 +1,136 @@ +package backend + +import ( + "bufio" + "encoding/json" + "fmt" + "log" + "os" + "os/signal" + "syscall" + "time" + + MQTT "github.com/eclipse/paho.mqtt.golang" + "github.com/urfave/cli" +) + +func createUrl(url string, port int) string { + return fmt.Sprintf("tcp://%s:%d", url, port) +} + +type message struct { + Time time.Time `json:string` + Topic string `json:string` + Payload string `json:string` +} + +func newRecoder(r *os.File, exitSignalCh chan os.Signal) chan MQTT.Message { + c := make(chan MQTT.Message) + go func() { + w := bufio.NewWriter(r) + for { + select { + case toSave := <-c: + { + now := time.Now() + m := message{now, + toSave.Topic(), + string(toSave.Payload())} + mStr, err := json.Marshal(m) + if err != nil { + log.Print("Problems with a message: topic %s, payload: %s", toSave.Topic(), toSave.Payload()) + } else { + fmt.Fprintf(w, "%+v\n", string(mStr)) + } + } + + case <-exitSignalCh: + { + fmt.Println("Flushing to disk") + w.Flush() + r.Close() + os.Exit(1) + return + } + } + } + }() + return c +} + +func StartRecording(c *cli.Context) { + url := createUrl(c.GlobalString("url"), c.GlobalInt("port")) + file, err := os.Create(c.GlobalString("record")) + if err != nil { + log.Fatal(err) + } + topic := c.GlobalString("topic") + + opts := MQTT.NewClientOptions() + opts.AddBroker(url) + + receiver := MQTT.NewClient(opts) + if token := receiver.Connect(); token.Wait() && token.Error() != nil { + log.Fatal(token.Error()) + } + exitSignalCh := make(chan os.Signal) + signal.Notify(exitSignalCh, os.Interrupt) + signal.Notify(exitSignalCh, syscall.SIGTERM) + + recoderCh := newRecoder(file, exitSignalCh) + f := func(receiver MQTT.Client, msg MQTT.Message) { + recoderCh <- msg + } + + if token := receiver.Subscribe(topic, 1, f); token.Wait() && token.Error() != nil { + log.Fatal(token.Error()) + } + + for { + } + +} + +func PlayBack(c *cli.Context) { + url := createUrl(c.GlobalString("url"), c.GlobalInt("port")) + + opts := MQTT.NewClientOptions() + opts.AddBroker(url) + + sender := MQTT.NewClient(opts) + if token := sender.Connect(); token.Wait() && token.Error() != nil { + log.Fatal(token.Error()) + } + playLoop := c.Bool("loop") + loopCount := 0 + for playLoop || loopCount == 0 { + loopCount++ + file, err := os.Open(c.GlobalString("record")) + if err != nil { + log.Fatal(err) + } + reader := bufio.NewScanner(file) + fastForward := c.Bool("ff") + var message message + var previousTime time.Time + firstRound := true + for reader.Scan() { + json.Unmarshal([]byte(reader.Text()), &message) + if !fastForward { + if !firstRound { + toWait := message.Time.Sub(previousTime) + previousTime = message.Time + time.Sleep(toWait) + } else { + firstRound = false + previousTime = message.Time + } + } + fmt.Println(message.Payload) + if token := sender.Publish(message.Topic, 1, false, message.Payload); token.Wait() && token.Error() != nil { + fmt.Println(token.Error()) + } + } + } + +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..935e7c9 --- /dev/null +++ b/main.go @@ -0,0 +1,73 @@ +package main + +import ( + "fmt" + "os" + + "github.com/urfave/cli" + + "github.com/siscia/mqtt-player/backend" +) + +func main() { + app := cli.NewApp() + app.Name = "mqtt-player" + + app.Flags = []cli.Flag{ + cli.StringFlag{ + Name: "record", + Usage: "What file use to record and playback", + Value: "mqtt-record.txt", + }, + cli.StringFlag{ + Name: "topic", + Usage: "What topic listen and what topic play back", + Value: "/#", + }, + cli.StringFlag{ + Name: "url", + Usage: "Where to listen to the MQTT broker", + Value: "localhost", + }, + cli.IntFlag{ + Name: "port", + Usage: "What port to use to connect to the broker", + Value: 1883, + }, + } + + app.Commands = []cli.Command{ + { + Name: "record", + Aliases: []string{"r", "rec"}, + Usage: "Record traffic from a MQTT broker", + Action: func(c *cli.Context) error { + fmt.Println(c.GlobalString("url")) + backend.StartRecording(c) + return nil + }, + }, + { + Name: "play", + Aliases: []string{"p"}, + Usage: "Play back previous registered traffic to a MQTT broker", + Action: func(c *cli.Context) error { + fmt.Println("Playing back the recorded traffic from: ", c.GlobalString("record")) + backend.PlayBack(c) + return nil + }, + Flags: []cli.Flag{ + cli.BoolFlag{ + Name: "ff", + Usage: "Fast forward, if true plays all the messages in the file without respecting the times differences"}, + + cli.BoolFlag{ + Name: "loop", + Usage: "Loop the player and keep playing all the messages, in order, indefinitely", + }, + }, + }, + } + + app.Run(os.Args) +}