Get Weather Forecast
For the heating control, you might be interested in the outside weather. Instead of own sensors, I utilize a public weather forecast service. Especially, if you intend to control the heating in dependency of the todays sun shine, you are interested in the sun shine prediction. The German weather service DWD offers weather prediction data for many towns for free.
My /etc/crontab to execute the script to update the weather forecast 4 times per day looks like:
SHELL=/bin/sh PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin # m h dom mon dow user command 16 4,10,16,22 * * * www-data ( cd /var/www/html/HVC && ./HvcWeather.py > /tmp/cron.log 2>&1 )
Usage of weather data
The provided kml file contains purely the prediction data. To archive the weather data, I write a logfile per day and spend some effort to merge the new prediction with the conserved past data.
Instead of an actual sensor, I interpolate the weather to the current point in time. Additionaly I plot the weather prediction:
Python interface
#!/usr/local/bin/python3.8 """ /usr/bin/wget –no-check-certificate https://opendata.dwd.de/weather/local_foreca sts/mos/MOSMIX_L/single_stations/L648/kml/MOSMIX_L_LATEST_L648.kmz -O /var/www/h tml/HVC/altenstadt.kmz cd /var/www/html/HVC/ && /usr/bin/unzip altenstadt.kmz for f in /var/www/html/HVC/MOSMIX*; do mv "$f" /var/www/html/HVC/altenstadt.kml done """ import os import xml.etree.ElementTree as ET from math import exp import copy class HvcWeather: """ Instead of ambient sensors, utilize some public weather forecast to have an estimate of the sun shine today. In Germany, you get the data for you local region from the Deutscher Wetterdienst: https://wettwarn.de/mosmix/mosmix.html https://opendata.dwd.de/weather/local_forecasts/mos/MOSMIX_L/single_stations/L648/kml/MOSMIX_L_LATEST_L648.kmz You may run this regularly by adding the line: sudo nano /etc/crontab 16 4,10,16,22 * * * www-data ( cd /var/www/html/HVC && ./HvcWeather.py > /tmp/cron.log 2>&1 ) Dr. Arne Jachens 2020-06-22 """ def __init__(self, source, debug=False): """ Select the variables, you are interested in. The KML file contains much more information then we need, let's find the relevant data... Data content specification: https://opendata.dwd.de/weather/lib/MetElementDefinition.xml TTT Temperature 2m above surface, K Td Dewpoint 2m above surface, K FX3 Maximum wind gust within the last hour, m/s RR1c Rain-Equivalent during the last 1 hours kg/m2 RRS3c Snow-Rain-Equivalent during the last 3 hours kg/m2, nur im Winter Neff Effective cloud cover, % (0..100) PPPP Surface pressure, Pa Rad1h Global Irradiance, kJ/m2 FXh55 Probability of wind gusts >= 55kn within the last 12 hours, % (0..100) SunD Yesterdays total sunshine duration, s SunD3 Sunshine duration during the last three hours, s To test the stuff, set debug = True. """ self.debug = debug self.source=source self.vars = {} self.scale = {} self.unit = {} self.vars["TAmb"]="TTT" self.scale["TAmb"]=1 self.unit["TAmb"]="C" self.vars["Tdp"]="Td" self.scale["Tdp"]=1 self.unit["Tdp"]="C" self.vars["p"]="PPPP" self.scale["p"]=0.001 self.unit["p"]="kPa" self.vars["clouds"]="Neff" self.scale["clouds"]=1 self.unit["clouds"]="%" self.vars["I"]="Rad1h" self.scale["I"]=1/3.6 self.unit["I"]="W/m2" self.vars["rain"]="RR1c" self.scale["rain"]=1 self.unit["rain"]="mm/m2" self.vars["windMax"]="FX3" self.scale["windMax"]=1 self.unit["windMax"]="m/s" self.vars["storm"]="FXh55" self.scale["storm"]=1/100 self.unit["storm"]="%" self.unit["phi"] = "%" self.unit["x"] = "g/kg" def pull(self): """ Utilize wget to retrieve the actual weather forecast. """ #utilize wget cmd = "/usr/bin/wget –no-check-certificate " + self.source myLocation = self.source[ self.source.rfind("_")+1: ] cmd = cmd + " -O " + myLocation os.system(cmd) #extract archive cmd = "/usr/bin/unzip " + myLocation os.system(cmd) #rename the file kmlfile = myLocation[:myLocation.find(".")] + ".kml" cmd = "mv MOSMIX*.kml " + kmlfile print(cmd) os.system(cmd) return kmlfile """ Download the document specifications and put them to http://localhost/HVC/ : /usr/bin/wget http://www.opengis.net/kml/2.2 -O ogckml22.xsd /usr/bin/wget https://opendata.dwd.de/weather/lib/pointforecast_dwd_extension_V1_0.xsd """ def parse(self,kmlFile): """ The KML is a specific XML format, use etree to extract the content. The variables of interest are extrected, some units changed, and finally derived humidity data is computed. """ namespaces = {'kml': 'http://www.opengis.net/kml/2.2', 'dwd': 'https://opendata.dwd.de/weather/lib/pointforecast_dwd_extension_V1_0.xsd'} #namespaces = {'kml': './ogckml22.xsd', 'dwd': './pointforecast_dwd_extension_V1_0.xsd'} if os.path.exists(kmlFile): xmlTree = ET.parse(kmlFile) xmlRoot = xmlTree.getroot() else: print(fileName,' does not exist') quit() #time steps for the prediction tag = 'kml:Document/kml:ExtendedData/dwd:ProductDefinition/dwd:ForecastTimeSteps/dwd:TimeStep' timeStepsXml = xmlRoot.findall(tag,namespaces) timeSteps = [] for ts in timeStepsXml: thisText = ts.text timeSteps.append(thisText[:thisText.find(".")]) #select all Forcast elements tag = 'kml:Document/kml:Placemark/kml:ExtendedData' ExtendedData = xmlRoot.findall(tag,namespaces) HA = HumidAir() #put data sets of interest in 'data' structure data={} varNames = self.vars.keys() for v in varNames: elName = self.vars[v] for ed in ExtendedData: thisXML = ed.find("./dwd:Forecast[@dwd:elementName='"+elName+"']",namespaces) data[v] = thisXML.find("./dwd:value",namespaces).text.split() #split weather forecast in days an do some basic conversions lastDate="" weather = {} for i in range(len(timeSteps)-1): t = timeSteps[i] date = t[:t.find("T")] time = t[t.find("T")+1:] time = time[:time.rfind(":")] # HH:MM if date!=lastDate: lastDate=date weather[date] = {} weather[date][time] = {} varNames = self.vars.keys() for v in varNames: val = data[v][i] if val=="-": val = -999 elif self.unit[v] == "C": val = float(val) - 273.15 else: val = self.scale[v] * float(val) weather[date][time][v] = val #derived data TAmb = weather[date][time]["TAmb"] Tdp = weather[date][time]["Tdp"] p = weather[date][time]["p"]*1000.0 rhoAir = HA.densityAir(TAmb+273.15, p) phi = HA.relHumidity(Tdp+273.15, TAmb+273.15) rhoW = HA.absHumidity(TAmb+273.15, phi) x = HA.specHumidity(rhoW,rhoAir) weather[date][time]['phi'] = phi*100.0 # 0...100% weather[date][time]['x'] = x*1000.0 # g/kg return weather def read(self,today): """ Read weather data of today, to replace updated data. """ myWeather={} myWeather[today]={} month = today[0:7] fileName = month+"/"+"weather_"+today+".log" if os.path.exists(fileName): with open(fileName,"r") as fid: lines = fid.readlines() varNames = lines[0].split() varNames[0] = varNames[0][1:] for i,l in enumerate(lines): if i>0: values = l.split() hour = values[0] myWeather[today][hour]={} for j,v in enumerate(values): if j>0: myWeather[today][hour][varNames[j]] = float(v) else: print("file "+fileName+" not found!") return myWeather def write(self,weather): """ write weather data to files, one per day """ message = "write()\n" dates = list(weather.keys()) dates.sort() """ try: varNames = list(weather[dates[0]]['23:00'].keys()) varNames.sort() except: print("tryed to access ",dates[0]) """ varNames = self.unit.keys() message = message + str(varNames) for i,d in enumerate(dates): if i<7: today = dates[i] todaysWeather = weather[today] msg = self.__writeTodaysWeather(today,todaysWeather,varNames) message = message + msg return message def __writeTodaysWeather(self,today,weather,varNames): message = "" month = today[0:7] #create logDirectory, if it does not exist yet try: os.mkdir(month) cmd = "chown arne:www-data "+month os.system(cmd) cmd = "chmod g+w "+month os.system(cmd) message = message + "directory "+month+" created \n" except FileExistsError: pass fileName = month+"/"+"weather_"+today+".log" message = message + "file: " + fileName with open(fileName,"w") as fid: string = " #time" for v in varNames: string = string + "\t"+v fid.write(string+"\n") myTimes = list(weather.keys()) myTimes.sort() for i,t in enumerate(myTimes): if t in weather: string = myTimes[i] for v in varNames: string = string + ('\t{0:.2f}'.format(float(weather[t][v]))) else: print(t," hour not in weather???") fid.write(string+"\n") return message def merge(self,oldWeather,newWeather): """ To keep the history of weather data, replace updated values for today. """ today = list(oldWeather.keys())[0] allHours = list(oldWeather[today].keys()) allHours.sort() #varNames = list(oldWeather[today]['23:00'].keys()) #varNames.sort() varNames = self.unit.keys() if self.debug: print("HvcWeather.merge ",today) print(varNames) print(allHours) for h in allHours: #take actual values if existing if h in newWeather[today]: #keep new data pass else: #conserve history newWeather[today][h] = copy.deepcopy(oldWeather[today][h]) if self.debug: print(h," conserved ",newWeather[today][h]) if self.debug: allDays = list(newWeather.keys()) print(allDays) #print("------------------") #for h in allHours: # print(h,newWeather[today][h]) return newWeather def actual(self,today,hour,minutes): """ To have actual data, interpolate hourly values """ weather = self.read(today) try: allHours = list(weather[today].keys()) #varNames = list(weather[today]['23:00'].keys()) except: print("Reading actual weather failed!") print(today) print(allHours) print(weather[today]['23:00']) #varNames.sort() varNames = self.unit.keys() if self.debug: print("HvcWeather.actual ",today,hour,minutes) print(varNames) print(weather[today]) #try to initialize try: lastWeather = weather[today]["00:00"] actWeather = weather[today]["00:00"] except: lastWeather = weather[today]["12:00"] actWeather = weather[today]["12:00"] hourFlt = float(hour) if hourFlt == 23.0: actWeather = weather[today]["23:00"] else: for h in allHours: nextWeather = weather[today][h] hFlt = float(h[0:h.find(":")]) if hFlt>hourFlt: w = float(minutes)/60.0 actWeather = {} for v in varNames: actWeather[v] = (1.0-w)*float(nextWeather[v]) + w*float(lastWeather[v]) break; else: lastWeather = nextWeather #tackle missing sun prediction if actWeather["I"] < 0: actWeather["I"] = actWeather["clouds"] #optionally analyze sun rise and dawn return actWeather class HumidAir: """ Physical properties of humid air """ def __init__(self): self.Ttr=273.16 self.pTr=611.657 self.Rair=287.06 self.Rw=461.52 def pSat(self,T): k1=17.2799 k2=4102.99 k3=237.431 theta = T - self.Ttr; p = self.pTr * exp(k1 - k2/(theta + k3)); return p def densityAir(self,T,p): rhoAir = p/self.Rair/T return rhoAir def relHumidity(self,Tdp,T): phi = self.pSat(Tdp)/self.pSat(T) return phi def absHumidity(self,T,phi): rhoW = self.pSat(T)*phi/self.Rw/T return rhoW def specHumidity(self,rhoW,rhoAir): x = rhoW/rhoAir return x if __name__ == "__main__": """ # test HumidAir HA = HumidAir() TAmb = 20+273.15 Tdp = 10+273.15 p =102.23 rhoAir = HA.densityAir(TAmb,p*1000.0) phi = HA.relHumidity(Tdp,TAmb) rhoW = HA.absHumidity(TAmb,phi) x = HA.specHumidity(rhoW,rhoAir) print('rhoAir',rhoAir) print('phi',phi) print('x',x) exit() """ source = "https://opendata.dwd.de/weather/local_forecasts/mos/MOSMIX_L/single_stations/L648/kml/MOSMIX_L_LATEST_L648.kmz" debug=0 wf = HvcWeather(source) if debug: from datetime import datetime kmlfile = "L648.kml" else: kmlfile = wf.pull() weather = wf.parse(kmlfile) allDates = list(weather.keys()) allDates.sort() if debug: print("allDates:",allDates) if debug: print("1. write weather") msg = wf.write(weather) print(msg) today = allDates[0] else: today = allDates[0] print("merge data of ",today) todaysWeather = wf.read(today) #weather=todaysWeather try: weather = wf.merge(todaysWeather,weather) except: print("Merging of weather failed!") print("Try to recover with actual values") #print("2. write weather") print(wf.write(weather)) if debug: now = datetime.now() today = now.strftime("%Y-%m-%d") hour = now.strftime("%H") minutes = now.strftime("%M") aW = wf.actual(today,hour,minutes) print("actual weather:") print(aW)