Here what I get
#!/usr/bin/env python3
"""
Fetch hourly forecast.solar data (UTC) and write to InfluxDB 2.x.
It writes one point per timestamp with fields:
- watts (instantaneous power W)
- watt_hours (cumulative Wh)
- watt_hours_day (daily Wh; only on YYYY-MM-DD entries)
Tags include: site, lat, lon, azimuth, declination, kwp, source="forecast.solar".
Configuration is via environment variables (see below).
"""
import os
import sys
import time
import json
import math
from datetime import datetime, timezone
import requests
from influxdb_client import InfluxDBClient, Point, WriteOptions
# ----------------------------
# Configuration (env vars)
# ----------------------------
FS_API_KEY = os.getenv("FS_API_KEY", "").strip() # optional; public API works without a key (1 hr resolution, today+tomorrow)
FS_LAT = os.getenv("FS_LAT") # required
FS_LON = os.getenv("FS_LON") # required
FS_DECL = os.getenv("FS_DECL") # panel tilt / declination in degrees (e.g. 37)
FS_AZIMUTH = os.getenv("FS_AZIMUTH") # 0 = south, 90 = west, 180 = north, 270 = east (Forecast.Solar uses 0=south)
FS_KWP = os.getenv("FS_KWP") # system size in kWp (e.g. 4.2)
# InfluxDB 2.x
INFLUX_URL = os.getenv("INFLUX_URL", "http://localhost:8086")
INFLUX_TOKEN = os.getenv("INFLUX_TOKEN") # required
INFLUX_ORG = os.getenv("INFLUX_ORG", "primary") # required-ish
INFLUX_BUCKET= os.getenv("INFLUX_BUCKET", "solar") # required-ish
# Optional niceties
SITE_TAG = os.getenv("SITE_TAG", "home") # tag to identify the site
MEASUREMENT = os.getenv("MEASUREMENT", "solar_forecast")
USER_AGENT = os.getenv("USER_AGENT", "forecast-solar-influx/1.0")
# Timeout & retry
HTTP_TIMEOUT = float(os.getenv("HTTP_TIMEOUT", "10"))
RETRIES = int(os.getenv("RETRIES", "2"))
BACKOFF_S = float(os.getenv("BACKOFF_S", "1.5"))
def _env_must(name):
v = os.getenv(name)
if not v:
sys.stderr.write(f"Missing env var: {name}\n")
sys.exit(2)
return v
def _validate_inputs():
# Required numeric params
for name in ("FS_LAT","FS_LON","FS_DECL","FS_AZIMUTH","FS_KWP"):
_ = _env_must(name)
# Influx 2.x requirements
_env_must("INFLUX_TOKEN")
# org & bucket can default, but ensure not empty
if not INFLUX_ORG:
sys.stderr.write("INFLUX_ORG is empty\n"); sys.exit(2)
if not INFLUX_BUCKET:
sys.stderr.write("INFLUX_BUCKET is empty\n"); sys.exit(2)
def build_api_url():
"""
Forecast.Solar format:
https://api.forecast.solar/[API_KEY/]<route>/<lat>/<lon>/<decl>/<azimuth>/<kwp>?time=utc
Public (no key) is allowed with hourly resolution (today+tomorrow).
We explicitly request UTC timestamps.
"""
base = "https://api.forecast.solar"
route = "estimate"
path = f"{float(FS_LAT):.6f}/{float(FS_LON):.6f}/{int(float(FS_DECL))}/{int(float(FS_AZIMUTH))}/{float(FS_KWP):.3f}"
if FS_API_KEY:
url = f"{base}/{FS_API_KEY}/{route}/{path}"
else:
url = f"{base}/{route}/{path}"
# force UTC timestamps
return f"{url}?time=utc"
def http_get_json(url):
headers = {"User-Agent": USER_AGENT, "Accept": "application/json"}
last_err = None
for attempt in range(1, RETRIES+2):
try:
r = requests.get(url, headers=headers, timeout=HTTP_TIMEOUT)
if r.status_code == 429:
# rate limited: back off a bit
time.sleep(BACKOFF_S * attempt)
continue
r.raise_for_status()
return r.json()
except Exception as e:
last_err = e
time.sleep(BACKOFF_S * attempt)
raise RuntimeError(f"Failed to GET {url}: {last_err}")
def parse_timeseries(result_obj):
"""
result_obj looks like:
{
"watts": {
"2019-06-22 05:00:00": 0, ...
},
"watt_hours": { ... cumulative ... },
"watt_hours_day": { "2019-06-22": 2626, ... }
}
Timestamps are strings in 'YYYY-MM-DD HH:MM:SS' and (with ?time=utc) are in UTC.
Returns list of dicts:
{"ts": datetime(UTC), "watts": int, "watt_hours": int}
And a dict for day totals keyed by YYYY-MM-DD.
"""
watts = result_obj.get("watts", {}) or {}
wh = result_obj.get("watt_hours", {}) or {}
wh_day= result_obj.get("watt_hours_day", {}) or {}
rows = []
for ts_str, w in watts.items():
# e.g. "2019-06-22 05:00:00"
dt = datetime.strptime(ts_str, "%Y-%m-%d %H:%M:%S").replace(tzinfo=timezone.utc)
rows.append({
"ts": dt,
"watts": int(w) if w is not None else None,
"watt_hours": int(wh.get(ts_str)) if ts_str in wh and wh.get(ts_str) is not None else None
})
# Sort by time just in case
rows.sort(key=lambda r: r["ts"])
return rows, wh_day
def write_points_influx(rows, day_totals):
client = InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG)
write = client.write_api(write_options=WriteOptions(batch_size=500, flush_interval=5_000))
# Static tags
base_tags = {
"site": SITE_TAG,
"lat": f"{float(FS_LAT):.6f}",
"lon": f"{float(FS_LON):.6f}",
"declination": str(int(float(FS_DECL))),
"azimuth": str(int(float(FS_AZIMUTH))),
"kwp": f"{float(FS_KWP):.3f}",
"source": "forecast.solar"
}
points = []
for r in rows:
p = (
Point(MEASUREMENT)
.tag("site", base_tags["site"])
.tag("lat", base_tags["lat"])
.tag("lon", base_tags["lon"])
.tag("declination", base_tags["declination"])
.tag("azimuth", base_tags["azimuth"])
.tag("kwp", base_tags["kwp"])
.tag("source", base_tags["source"])
.time(r["ts"])
)
if r["watts"] is not None:
p = p.field("watts", int(r["watts"]))
if r["watt_hours"] is not None:
p = p.field("watt_hours", int(r["watt_hours"]))
points.append(p)
# Also write day totals as separate points (same measurement) with a "day_total" field
for day_str, val in day_totals.items():
try:
dt = datetime.strptime(day_str, "%Y-%m-%d").replace(tzinfo=timezone.utc)
except Exception:
continue
p = (
Point(MEASUREMENT)
.tag("site", base_tags["site"])
.tag("lat", base_tags["lat"])
.tag("lon", base_tags["lon"])
.tag("declination", base_tags["declination"])
.tag("azimuth", base_tags["azimuth"])
.tag("kwp", base_tags["kwp"])
.tag("source", base_tags["source"])
.time(dt)
.field("watt_hours_day", int(val))
)
points.append(p)
if points:
write.write(bucket=INFLUX_BUCKET, org=INFLUX_ORG, record=points)
write.close()
client.close()
def main():
_validate_inputs()
url = build_api_url()
data = http_get_json(url)
# Forecast.Solar wraps the payload as {"result": {...}, "message": {...}}
result = (data or {}).get("result")
if not result:
msg = (data or {}).get("message", {})
raise SystemExit(f"No 'result' in response. message={msg}")
rows, day_totals = parse_timeseries(result)
if not rows and not day_totals:
raise SystemExit("No data parsed from API response.")
write_points_influx(rows, day_totals)
print(f"Wrote {len(rows)} hourly points and {len(day_totals)} day totals to InfluxDB bucket '{INFLUX_BUCKET}'.")
if __name__ == "__main__":
main()
Install
python3 -m venv .venv
. .venv/bin/activate
pip install requests influxdb-client
Export config example
export FS_LAT=51.5074
export FS_LON=-0.1278
export FS_DECL=37
export FS_AZIMUTH=0
export FS_KWP=4.2
# optional key if you have a paid plan; omit for public endpoint
# export FS_API_KEY=your_api_key
export INFLUX_URL=http://localhost:8086
export INFLUX_TOKEN=YOUR_INFLUXDB_TOKEN
export INFLUX_ORG=your_org
export INFLUX_BUCKET=solar
export SITE_TAG=roof
Run it
python forecast_solar_to_influx.py
Schedule it with cron/systemd to run hourly (public API updates hourly; paid tiers can be finer).
Measurement & schema
measurement: solar_forecast (override with MEASUREMENT)
tags: site, lat, lon, declination, azimuth, kwp, source=forecast.solar
fields:
watts (int)
watt_hours (int; cumulative)
watt_hours_day (int; only on daily points)
All timestamps are written as UTC. The script requests ?time=utc and treats incoming timestamps as UTC when parsing.
Notes / gotchas
Public vs API key: Public endpoint (no key) returns hourly data for today + tomorrow. If you add an API key, you can unlock longer horizons and finer resolution (depending on plan). The URL builder handles both.
Rate limiting: If you see HTTP 429, the script backs off and retries. Don’t schedule faster than the API updates (public: hourly).
Azimuth convention: Forecast.Solar uses 0° = south, 90° = west, 180° = north, 270° = east.
Be interesting to see if this is what you get !
InfluxDB 1.x?
If you’re on InfluxDB 1.x, install influxdb and swap the writer in write_points_influx() to use InfluxDBClient.write_points() with line protocol. I can paste a ready 1.x variant if you want it.