Using ssh to run commands on a remote Linux host to the pi.
1. Setup ssh-keys, using name socjcare for example.
2. Generate keys in Linux host, giving it a name in default location ~/.ssh.
3. Make sure the key is added, using
ssh-add ~/ssh/socjcare
3. copy the public keys to the pi's ~/.ssh/authorized keys
4. test
5. To avoid entering the password, in the Linux host ~/.ssh/config
enter this
a
6. Test by doing
ssh raspberrypi
7. Should connect with entering the password.
8. Sometimes the key is not recognized because the ssh-agent is
not running. To ensure that agent is running everytime, add
this to .bashrc file
# start ssh-agent
eval "$(ssh-agent -s)"
9.
Once done, create a python file on the Linux host, linux_to_pi.py
import subprocess
# Replace with your Pi's hostname or IP
#pi_host = "scarelo@192.9.0.213" # Or whatever user/IP your Pi has
pi_host = "raspberrypi" # Or whatever user/IP your Pi has
#script_path_on_pi = "/home/scarelo/myscript.py" # The Python script to run on the Pi
#script_path_on_pi = "/home/scarelo/progress_bar.py" # The Python script to run on the Pi
script_path_on_pi = "/home/scarelo/toggle_pin6.py" # The Python script to run on the Pi
# Run the script on the Pi via SSH
cmd = ["ssh", pi_host, f"python3 -u {script_path_on_pi}"]
# Start the subprocess and stream output live
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
print("Running script on Pi and streaming output:")
for line in process.stdout:
print(line, end="") # Already includes newline
process.wait()
print(f"Script exited with code {process.returncode}")
On the Raspberry pi, create the file myscript.py in /home/scarelo
#!/usr/bin/env python3
import pigpio
import time
PIN = 6 # GPIO6 (physical pin 31)
# Connect to local pigpiod daemon
pi = pigpio.pi()
if not pi.connected:
print("Failed to connect to pigpiod. Is it running?")
exit(1)
# Set pin as output
pi.set_mode(PIN, pigpio.OUTPUT)
print("Toggling GPIO pin 6... (Ctrl+C to stop)")
try:
while True:
pi.write(PIN, 1) # Turn ON
print("ON") # Turn ON
time.sleep(1)
pi.write(PIN, 0) # Turn OFF
time.sleep(1)
print("OFF") # Turn ON
except KeyboardInterrupt:
print("Exiting...")
finally:
pi.write(PIN, 0) # Turn off the pin
pi.stop() # Disconnect from pigpio daemon
Start pigpiod daemon on the pi.
On the linux host, start the Python command
python linux_to_pi.py
should start seeing the output
$ python linux_to_pi.py
Running script on Pi and streaming output:
Toggling GPIO pin 6... (Ctrl+C to stop)
ON
OFF
ON
OFF
ON
OFF
ON
OFF
ON
OFF
ON
OFF
ON
Improved Linux to pi- using Flask
© All Rights Reserved.
Improved linux to pi communication
Running on Ubuntu
Assumes – flask already installed
ubuntu machine can access the pi using authorized keys
start by python flask_gui_watcher.py
starts flask and waits for a post from the Bitbucket runner or from another site
from flask import Flask, request
import subprocess
import threading
import os
app = Flask(__name__)
# Optional: Set the correct display and Xauthority if needed
print(f"DISPLAY variable: {os.getenv('DISPLAY')}")
os.environ['DISPLAY'] = ':0'
os.environ['XAUTHORITY'] = f"/home/{os.getenv('USER')}/.Xauthority"
@app.route('/trigger-gui', methods=['POST'])
def trigger_gui():
def launch():
print("Launching PyQt5 GUI...")
subprocess.Popen(["python3", "/home/kelk/ADLC_improved/display.py"])
threading.Thread(target=launch).start()
return {"status": "GUI launched"}, 200
if __name__ == '__main__':
app.run(host='127.0.0.1', port=5001)
This is the display routine that waits from streaming data from the pi
import sys, json, subprocess, threading
from PyQt5.QtWidgets import QMainWindow, QApplication, QPushButton
from PyQt5.QtCore import QTimer
from adlc_ui import Ui_MainWindow
QApplication.setStyle("Fusion")
class Setup(QMainWindow, Ui_MainWindow):
def __init__(self, parent=None):
super().__init__(parent)
self.setupUi(self)
self.latest_data = {"analog": [0]*4, "digital": [0]*8}
self.setdata()
self.start_ssh_stream("scarelo@raspberrypi", "/home/scarelo/pi_streamer.py") # ← Update Pi IP/path as needed
def setdata(self):
button_style = """
QPushButton {
background-color: transparent;
color: black;
border: 2px solid gray;
border-radius: 4px;
}
QPushButton:disabled {
background-color: lightgray;
color: yellow;
}
"""
button_width = 80
button_height = 25
for i in range(8):
dout = getattr(self, f"DOut{i}")
dout.setFixedSize(button_width, button_height)
dout.setEnabled(True)
dout.setText('OFF')
dout.setStyleSheet(button_style)
for i in range(8):
din = getattr(self, f"DIn{i}")
din.setFixedSize(button_width, button_height)
din.setEnabled(True)
din.setText('OFF')
din.setStyleSheet(button_style)
self.DOut0.setStyleSheet("background-color: green; color: blue; ")
self.DOut0.setText("ON")
new_width = 120
new_height = 40
for lcd in [self.AOut0, self.AOut1, self.AOut2, self.AOut3]:
geo = lcd.geometry()
lcd.setGeometry(geo.x(), geo.y(), new_width, new_height)
self.rx_timer = QTimer(self)
self.rx_timer.timeout.connect(self.update_ui)
self.rx_timer.start(500)
def update_ui(self):
data = self.latest_data
for i, lcd in enumerate([self.AOut0, self.AOut1, self.AOut2, self.AOut3]):
lcd.display(data["analog"][i])
for i in range(8):
dout = getattr(self, f"DOut{i}")
value = data["digital"][i]
if value:
dout.setStyleSheet("background-color: green; color: blue; ")
dout.setText("ON")
else:
dout.setStyleSheet("background-color: cyan; color: gray; ")
dout.setText("OFF")
def start_ssh_stream(self, pi_host, script_path_on_pi):
def stream():
while True:
cmd = ["ssh", pi_host, f"python3 -u {script_path_on_pi}"]
try:
print(f"Connecting to {pi_host}...")
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
for line in process.stdout:
line = line.strip()
try:
self.latest_data = json.loads(line)
except json.JSONDecodeError:
print("Invalid JSON:", line)
except Exception as e:
print("SSH stream error:", e)
print("SSH connection lost. Reconnecting in 5 seconds...")
QTimer.singleShot(5000, lambda: None) # Just a non-blocking wait
time.sleep(5)
thread = threading.Thread(target=stream, daemon=True)
thread.start()
if __name__ == '__main__':
app = QApplication(sys.argv)
window = Setup()
window.show()
sys.exit(app.exec())
Running on the pi pi_streamer.py
import pigpio
import json
import time
import random # Import the random module to generate random values
pi = pigpio.pi()
DIGITAL_PINS = [4, 17, 27, 22, 5, 6, 13, 19]
def get_random_analog_value():
# Generate a random float between 9 and 10, rounded to the nearest 0.2
return round(random.uniform(9, 10) / 0.2) * 0.2
while True:
# Generate random values for each digital pin (0 or 1)
digital = [random.randint(0, 1) for _ in DIGITAL_PINS]
# Generate 4 random analog values between 9 and 10, varying by 0.2
analog = [get_random_analog_value() for _ in range(4)] # Simulate 4 random analog values
print(json.dumps({"digital": digital, "analog": analog}), flush=True)
time.sleep(0.5)