231 lines
5.8 KiB
Python
231 lines
5.8 KiB
Python
import os
|
|
import time
|
|
import json
|
|
import requests
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
# ===== SMS GATEWAY =====
|
|
SMS_USER = os.getenv("SMS_USER", "weatherbot")
|
|
SMS_PASS = os.getenv("SMS_PASS", "Cupakabra17")
|
|
|
|
RECEIVE_URL = "https://sms.internet-master.cz/receive/"
|
|
SEND_URL = "https://sms.internet-master.cz/send/"
|
|
|
|
# ===== TIMING =====
|
|
POLL_INTERVAL = 1 # seconds (low latency)
|
|
|
|
# ===== STATE =====
|
|
STATE_FILE = Path("weatherbot_state.json")
|
|
MAX_SEEN_OFFSETS = 5000
|
|
|
|
# ===== OPEN-METEO =====
|
|
GEO_URL = "https://geocoding-api.open-meteo.com/v1/search"
|
|
WEATHER_URL = "https://api.open-meteo.com/v1/forecast"
|
|
|
|
# city -> (lat, lon)
|
|
GEO_CACHE = {}
|
|
|
|
HEADERS = {
|
|
"User-Agent": "weatherbot/1.0 (jakub@jimbuntu)"
|
|
}
|
|
|
|
session = requests.Session()
|
|
session.headers.update(HEADERS)
|
|
|
|
# =========================================================
|
|
# SMS API
|
|
# =========================================================
|
|
|
|
def fetch_inbox(limit=5):
|
|
r = session.get(
|
|
RECEIVE_URL,
|
|
params={
|
|
"username": SMS_USER,
|
|
"password": SMS_PASS,
|
|
"limit": limit
|
|
},
|
|
timeout=5
|
|
)
|
|
r.raise_for_status()
|
|
return r.json()
|
|
|
|
def send_sms(number, message):
|
|
r = session.get(
|
|
SEND_URL,
|
|
params={
|
|
"number": number,
|
|
"message": message,
|
|
"username": SMS_USER,
|
|
"password": SMS_PASS
|
|
},
|
|
timeout=5
|
|
)
|
|
r.raise_for_status()
|
|
print("📤 SEND:", r.text)
|
|
|
|
# =========================================================
|
|
# WEATHER (Open-Meteo, no registration)
|
|
# =========================================================
|
|
|
|
CODE_MAP = {
|
|
0: "Clear",
|
|
1: "Mostly clear",
|
|
2: "Partly cloudy",
|
|
3: "Cloudy",
|
|
45: "Fog",
|
|
48: "Fog",
|
|
51: "Drizzle",
|
|
53: "Drizzle",
|
|
55: "Drizzle",
|
|
61: "Rain",
|
|
63: "Rain",
|
|
65: "Heavy rain",
|
|
71: "Snow",
|
|
73: "Snow",
|
|
75: "Heavy snow",
|
|
80: "Showers",
|
|
81: "Showers",
|
|
82: "Heavy showers",
|
|
95: "Storm"
|
|
}
|
|
|
|
def get_weather(city):
|
|
city = city.strip()
|
|
if not city:
|
|
return "❌ Send a city name."
|
|
|
|
try:
|
|
key = city.lower()
|
|
|
|
# --- GEO ---
|
|
if key in GEO_CACHE:
|
|
lat, lon = GEO_CACHE[key]
|
|
else:
|
|
r = session.get(
|
|
GEO_URL,
|
|
params={"name": city, "count": 1},
|
|
timeout=2
|
|
)
|
|
r.raise_for_status()
|
|
data = r.json()
|
|
results = data.get("results")
|
|
|
|
if not results:
|
|
return f"❌ Unknown city: {city}"
|
|
|
|
lat = results[0]["latitude"]
|
|
lon = results[0]["longitude"]
|
|
GEO_CACHE[key] = (lat, lon)
|
|
|
|
# --- WEATHER ---
|
|
r = session.get(
|
|
WEATHER_URL,
|
|
params={
|
|
"latitude": lat,
|
|
"longitude": lon,
|
|
"daily": "temperature_2m_mean,weathercode",
|
|
"timezone": "auto"
|
|
},
|
|
timeout=2
|
|
)
|
|
r.raise_for_status()
|
|
data = r.json()
|
|
|
|
daily = data.get("daily", {})
|
|
dates = daily.get("time", [])[:3]
|
|
temps = daily.get("temperature_2m_mean", [])[:3]
|
|
codes = daily.get("weathercode", [])[:3]
|
|
|
|
if not dates:
|
|
return f"❌ Weather unavailable for {city}"
|
|
|
|
msg = f"🌦 {city}:\n"
|
|
for d, t, c in zip(dates, temps, codes):
|
|
wd = datetime.strptime(d, "%Y-%m-%d").strftime("%a")
|
|
desc = CODE_MAP.get(c, "Weather")
|
|
msg += f"{wd}: {round(t)}°C {desc}\n"
|
|
|
|
return msg.strip()
|
|
|
|
except Exception:
|
|
return f"⚠ Weather service error for {city}"
|
|
|
|
# =========================================================
|
|
# STATE HANDLING
|
|
# =========================================================
|
|
|
|
def load_state():
|
|
if not STATE_FILE.exists():
|
|
return {"seen_offsets": []}
|
|
try:
|
|
with STATE_FILE.open("r", encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
return data if isinstance(data, dict) else {"seen_offsets": []}
|
|
except Exception:
|
|
return {"seen_offsets": []}
|
|
|
|
def save_state(state):
|
|
seen = state.get("seen_offsets", [])
|
|
if len(seen) > MAX_SEEN_OFFSETS:
|
|
state["seen_offsets"] = seen[-MAX_SEEN_OFFSETS:]
|
|
|
|
tmp = STATE_FILE.with_suffix(".tmp")
|
|
with tmp.open("w", encoding="utf-8") as f:
|
|
json.dump(state, f, ensure_ascii=False)
|
|
tmp.replace(STATE_FILE)
|
|
|
|
def prime_seen_offsets(seen_set):
|
|
try:
|
|
data = fetch_inbox(limit=20)
|
|
for sms in data.get("inbox", []):
|
|
offset = sms.get("offset")
|
|
if offset is not None:
|
|
seen_set.add(offset)
|
|
print("🧹 Inbox primed (no backlog replies).")
|
|
except Exception as e:
|
|
print("⚠ Prime failed:", e)
|
|
|
|
# =========================================================
|
|
# MAIN LOOP
|
|
# =========================================================
|
|
|
|
print("📡 SMS Weather Bot started (Open-Meteo, no registration)")
|
|
|
|
state = load_state()
|
|
seen_list = state.get("seen_offsets", [])
|
|
seen_set = set(seen_list)
|
|
|
|
prime_seen_offsets(seen_set)
|
|
state["seen_offsets"] = list(seen_set)
|
|
save_state(state)
|
|
|
|
while True:
|
|
try:
|
|
data = fetch_inbox(limit=5)
|
|
|
|
for sms in data.get("inbox", []):
|
|
offset = sms.get("offset")
|
|
if offset in seen_set or offset is None:
|
|
continue
|
|
|
|
phone = sms.get("phone", "")
|
|
text = (sms.get("message") or "").strip()
|
|
|
|
seen_set.add(offset)
|
|
seen_list.append(offset)
|
|
state["seen_offsets"] = seen_list
|
|
save_state(state)
|
|
|
|
print(f"📨 {phone}: {text}")
|
|
|
|
reply = get_weather(text)
|
|
send_sms(phone, reply)
|
|
|
|
print(f"✅ Replied to {phone}")
|
|
|
|
except Exception as e:
|
|
print("⚠ ERROR:", e)
|
|
|
|
time.sleep(POLL_INTERVAL)
|