#!/bin/python3 # Importing the library import psutil import argparse import time import subprocess import json from pathlib import Path from typing import Dict, List import requests parser = argparse.ArgumentParser() parser.add_argument("-c", "--config", type=str, default=".karanage.json", help="json configuration file") parser.add_argument("-u", "--url", type=str, default="http://localhost:20080/karanage/api/state", help="Base URL of the web service") parser.add_argument("-g", "--group", type=str, default="home", help="Group the the message") parser.add_argument("-t", "--topic", type=str, default="PC/system", help="Topic of the message") parser.add_argument("-s", "--sleep", type=int, default=10, help="Periodicity of the messages") args = parser.parse_args() if Path(args.config).exists(): f = open(args.config, "r") configuration = json.loads(f.read()) f.close() else: configuration = { "cpu": "auto", "memory": "auto", "swap": "auto", "drive": "auto", "network": "auto", } def send_to_server(data: Dict) -> None: ret = requests.post(f"{args.url}/{args.group}/{args.topic}", json=out) if 200 <= ret.status_code <= 299: pass # print(f" ==> done {ret}") else: print(f" ==> An error occured in sending message to the server !! {ret}") cpu_core_count = psutil.cpu_count(logical=False) cpu_thread_count = psutil.cpu_count() def get_mounts() -> Dict: process = subprocess.Popen( ["mount", "-v"], stdout=subprocess.PIPE, stderr=subprocess.PIPE ) stdout, stderr = process.communicate() """get data format: $ mount -v proc on /proc type proc (rw,nosuid,nodev,noexec,relatime) sys on /sys type sysfs (rw,nosuid,nodev,noexec,relatime) dev on /dev type devtmpfs (rw,nosuid,relatime,size=7049156k,nr_inodes=1762289,mode=755,inode64) run on /run type tmpfs (rw,nosuid,nodev,relatime,mode=755,inode64) efivarfs on /sys/firmware/efi/efivars type efivarfs (rw,nosuid,nodev,noexec,relatime) /dev/nvme0n1p5 on / type ext4 (rw,relatime,stripe=32) """ out = {} lines = stdout.decode("utf-8").split("\n") for line in lines: tmp = line.split(" ") if len(tmp) <= 2: continue if tmp[0].startswith('/dev/'): out[tmp[2]] = tmp[0][5:] return out mounting_points = get_mounts(); def filter(data: Dict, filter_list: List[str]) -> Dict: out = {} print(f"Request filter {data.keys()} with filter {filter_list}") for key in data: if key in filter_list: out[key] = data[key] return out def need_process(data: Dict) -> Dict: return configuration["cpu"] == "auto" or "include" in configuration["cpu"] def create_cpu_data() -> Dict: # scpufreq(current=1605.5205625000003, min=400.0, max=4372.0) cpu_frequency = psutil.cpu_freq(percpu=False) cpu_percent_use = psutil.cpu_percent() return { "core": cpu_core_count, "thread": cpu_thread_count, "frequency": cpu_frequency.current, "use": cpu_percent_use, "max": cpu_frequency.max, } def create_memory_data() -> Dict: # svmem(total=14473519104, available=8289726464, percent=42.7, used=5380497408, free=3276263424, active=1775763456, inactive=8335540224, buffers=361771008, cached=5454987264, shared=243720192, slab=544526336) mem = psutil.virtual_memory() return { "used": mem.used, "total": mem.total, } def create_swap_data() -> Dict: # sswap(total=17179865088, used=262144, free=17179602944, percent=0.0, sin=0, sout=0) swap = psutil.swap_memory() return { "used": swap.used, "total": swap.total, } def create_drive_data() -> Dict: tmp_elem = {} # nvme0n1p1 => sdiskio(read_count=187, write_count=3, read_bytes=6002176, write_bytes=5120, read_time=36, write_time=7, read_merged_count=504, write_merged_count=0, busy_time=67) drive_access = psutil.disk_io_counters(perdisk=True) for elem in mounting_points: # sdiskusage(total=255162540032, used=112077000704, free=130049380352, percent=46.3) drive_usage = psutil.disk_usage(elem) # print(f"plop {mounting_points[elem]} ==> {drive_access.keys()}") if mounting_points[elem] in drive_access: tmp_elem[elem] = { "read_bytes": drive_access[mounting_points[elem]].read_bytes, "write_bytes": drive_access[mounting_points[elem]].write_bytes, "used": drive_usage.used, "total": drive_usage.total, } else: tmp_elem[elem] = { "used": drive_usage.used, "total": drive_usage.total, } return tmp_elem def create_network_data() -> Dict: # eth0 => snetio(bytes_sent=0, bytes_recv=0, packets_sent=0, packets_recv=0, errin=0, errout=0, dropin=0, dropout=0) data = psutil.net_io_counters(pernic=True) tmp_elem = {} for elem in data: tmp_elem[elem] = { "bytes_sent": data[elem].bytes_sent, "bytes_recv": data[elem].bytes_recv, } return tmp_elem def agglutinate(configuration, name, data): if configuration[name] == "auto": return data elif "include" in configuration[name]: return filter(data, configuration[name]["include"]) return none while True: out = {} if need_process(configuration["cpu"]): base_elem = create_cpu_data() out["cpu"] = agglutinate(configuration, "cpu", base_elem) if need_process(configuration["memory"]): base_elem = create_memory_data() out["memory"] = agglutinate(configuration, "memory", base_elem) if need_process(configuration["swap"]): base_elem = create_swap_data() out["swap"] = agglutinate(configuration, "swap", base_elem) if need_process(configuration["drive"]): base_elem = create_drive_data() out["drive"] = agglutinate(configuration, "drive", base_elem) if configuration["network"] == "auto" or "include" in configuration["network"]: base_elem = create_network_data() out["network"] = agglutinate(configuration, "network", base_elem) #print("Read new value:") print(json.dumps(out, indent=4)) send_to_server(out) time.sleep(args.sleep)