Skip to content

Running commands remotely on the pi

Using ssh to run commands on a remote Linux host to the pi.
1. Setup ssh-keys, using name socjcare for example.
2.  Generate  keys in Linux host, giving it a name in default location ~/.ssh.
3. Make sure the key is added, using
     ssh-add ~/ssh/socjcare
3. copy the public keys to the pi's ~/.ssh/authorized keys
4. test
5. To avoid entering the password, in the Linux host ~/.ssh/config
    enter this
a
6. Test by doing 
     ssh raspberrypi 
7. Should connect with entering the password.
8. Sometimes the key is not recognized because the ssh-agent is 
    not    running. To ensure that agent is running everytime, add
    this to .bashrc file
     # start ssh-agent
     eval "$(ssh-agent -s)"
9. 

Once done, create a python file on the Linux host, linux_to_pi.py

import subprocess
# Replace with your Pi's hostname or IP
#pi_host = "scarelo@192.9.0.213"  # Or whatever user/IP your Pi has
pi_host = "raspberrypi"  # Or whatever user/IP your Pi has
#script_path_on_pi = "/home/scarelo/myscript.py"  # The Python script to run on the Pi
#script_path_on_pi = "/home/scarelo/progress_bar.py"  # The Python script to run on the Pi
script_path_on_pi = "/home/scarelo/toggle_pin6.py"  # The Python script to run on the Pi
# Run the script on the Pi via SSH
cmd = ["ssh", pi_host, f"python3 -u  {script_path_on_pi}"]
# Start the subprocess and stream output live
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
print("Running script on Pi and streaming output:")
for line in process.stdout:
    print(line, end="")  # Already includes newline
process.wait()
print(f"Script exited with code {process.returncode}")

On the Raspberry pi, create the file myscript.py in /home/scarelo

#!/usr/bin/env python3
import pigpio
import time
PIN = 6  # GPIO6 (physical pin 31)
# Connect to local pigpiod daemon
pi = pigpio.pi()
if not pi.connected:
    print("Failed to connect to pigpiod. Is it running?")
    exit(1)
# Set pin as output
pi.set_mode(PIN, pigpio.OUTPUT)
print("Toggling GPIO pin 6... (Ctrl+C to stop)")
try:
    while True:
        pi.write(PIN, 1)  # Turn ON
        print("ON")  # Turn ON
        time.sleep(1)
        pi.write(PIN, 0)  # Turn OFF
        time.sleep(1)
        print("OFF")  # Turn ON
except KeyboardInterrupt:
    print("Exiting...")
finally:
    pi.write(PIN, 0)      # Turn off the pin
    pi.stop()             # Disconnect from pigpio daemon

Start pigpiod daemon on the pi.

On the linux host, start the Python command

python linux_to_pi.py

 

should start seeing the output

$ python linux_to_pi.py
Running script on Pi and streaming output:
Toggling GPIO pin 6... (Ctrl+C to stop)
ON
OFF
ON
OFF
ON
OFF
ON
OFF
ON
OFF
ON
OFF
ON

Improved Linux to pi- using Flask

 

© All Rights Reserved.

Improved  linux to pi communication

Running on Ubuntu

 

Assumes – flask already installed

ubuntu machine can access the pi using authorized keys

 

start by python flask_gui_watcher.py

 

starts flask and waits for a post from  the Bitbucket runner or from another site

from flask import Flask, request
import subprocess
import threading
import os

app = Flask(__name__)

# Optional: Set the correct display and Xauthority if needed
print(f"DISPLAY variable: {os.getenv('DISPLAY')}")
os.environ['DISPLAY'] = ':0'
os.environ['XAUTHORITY'] = f"/home/{os.getenv('USER')}/.Xauthority"

@app.route('/trigger-gui', methods=['POST'])
def trigger_gui():
    def launch():
        print("Launching PyQt5 GUI...")
        subprocess.Popen(["python3", "/home/kelk/ADLC_improved/display.py"])
    threading.Thread(target=launch).start()
    return {"status": "GUI launched"}, 200

if __name__ == '__main__':
    app.run(host='127.0.0.1', port=5001)

This is the display routine that waits from streaming data from the pi

import sys, json, subprocess, threading
from PyQt5.QtWidgets import QMainWindow, QApplication, QPushButton
from PyQt5.QtCore import QTimer
from adlc_ui import Ui_MainWindow


QApplication.setStyle("Fusion")

class Setup(QMainWindow, Ui_MainWindow):
    def __init__(self, parent=None):
        super().__init__(parent)
        self.setupUi(self)
        self.latest_data = {"analog": [0]*4, "digital": [0]*8}
        self.setdata()
        self.start_ssh_stream("scarelo@raspberrypi", "/home/scarelo/pi_streamer.py")  # ← Update Pi IP/path as needed

    def setdata(self):
        button_style = """
               QPushButton {
                   background-color: transparent;
                   color: black;
                   border: 2px solid gray;
                   border-radius: 4px;
               }
               QPushButton:disabled {
                   background-color: lightgray;
                   color: yellow;
               }
           """
        button_width = 80
        button_height = 25

        for i in range(8):
            dout = getattr(self, f"DOut{i}")
            dout.setFixedSize(button_width, button_height)
            dout.setEnabled(True)
            dout.setText('OFF')
            dout.setStyleSheet(button_style)

        for i in range(8):
            din = getattr(self, f"DIn{i}")
            din.setFixedSize(button_width, button_height)
            din.setEnabled(True)
            din.setText('OFF')
            din.setStyleSheet(button_style)

        self.DOut0.setStyleSheet("background-color: green; color: blue; ")
        self.DOut0.setText("ON")

        new_width = 120
        new_height = 40
        for lcd in [self.AOut0, self.AOut1, self.AOut2, self.AOut3]:
            geo = lcd.geometry()
            lcd.setGeometry(geo.x(), geo.y(), new_width, new_height)

        self.rx_timer = QTimer(self)
        self.rx_timer.timeout.connect(self.update_ui)
        self.rx_timer.start(500)

    def update_ui(self):
        data = self.latest_data
        for i, lcd in enumerate([self.AOut0, self.AOut1, self.AOut2, self.AOut3]):
            lcd.display(data["analog"][i])

        for i in range(8):
            dout = getattr(self, f"DOut{i}")
            value = data["digital"][i]
            if value:
                dout.setStyleSheet("background-color: green; color: blue; ")
                dout.setText("ON")
            else:
                dout.setStyleSheet("background-color: cyan; color: gray; ")
                dout.setText("OFF")

    def start_ssh_stream(self, pi_host, script_path_on_pi):
        def stream():
            while True:
                cmd = ["ssh", pi_host, f"python3 -u {script_path_on_pi}"]
                try:
                    print(f"Connecting to {pi_host}...")
                    process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)

                    for line in process.stdout:
                        line = line.strip()
                        try:
                            self.latest_data = json.loads(line)
                        except json.JSONDecodeError:
                            print("Invalid JSON:", line)

                except Exception as e:
                    print("SSH stream error:", e)

                print("SSH connection lost. Reconnecting in 5 seconds...")
                QTimer.singleShot(5000, lambda: None)  # Just a non-blocking wait
                time.sleep(5)

        thread = threading.Thread(target=stream, daemon=True)
        thread.start()


if __name__ == '__main__':
    app = QApplication(sys.argv)
    window = Setup()
    window.show()
    sys.exit(app.exec())

Running on the pi pi_streamer.py

import pigpio
import json
import time
import random  # Import the random module to generate random values

pi = pigpio.pi()
DIGITAL_PINS = [4, 17, 27, 22, 5, 6, 13, 19]

def get_random_analog_value():
    # Generate a random float between 9 and 10, rounded to the nearest 0.2
    return round(random.uniform(9, 10) / 0.2) * 0.2

while True:
    # Generate random values for each digital pin (0 or 1)
    digital = [random.randint(0, 1) for _ in DIGITAL_PINS]

    # Generate 4 random analog values between 9 and 10, varying by 0.2
    analog = [get_random_analog_value() for _ in range(4)]  # Simulate 4 random analog values

    print(json.dumps({"digital": digital, "analog": analog}), flush=True)
    time.sleep(0.5)

Leave a Reply

Your email address will not be published. Required fields are marked *