Get Weather Forecast
For the heating control, you might be interested in the outside weather. Instead of own sensors, I utilize a public weather forecast service. Especially, if you intend to control the heating in dependency of the todays sun shine, you are interested in the sun shine prediction. The German weather service DWD offers weather prediction data for many towns for free.
My /etc/crontab to execute the script to update the weather forecast 4 times per day looks like:
SHELL=/bin/sh PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin # m h dom mon dow user command 16 4,10,16,22 * * * www-data ( cd /var/www/html/HVC && ./HvcWeather.py > /tmp/cron.log 2>&1 )
Python interface
#!/usr/local/bin/python3.8 """ /usr/bin/wget –no-check-certificate https://opendata.dwd.de/weather/local_foreca sts/mos/MOSMIX_L/single_stations/L648/kml/MOSMIX_L_LATEST_L648.kmz -O /var/www/h tml/HVC/altenstadt.kmz cd /var/www/html/HVC/ && /usr/bin/unzip altenstadt.kmz for f in /var/www/html/HVC/MOSMIX*; do mv "$f" /var/www/html/HVC/altenstadt.kml done """ import os import xml.etree.ElementTree as ET from math import exp import copy class HvcWeather: """ Instead of ambient sensors, utilize some public weather forecast to have an estimate of the sun shine today. In Germany, you get the data for you local region from the Deutscher Wetterdienst: https://opendata.dwd.de/weather/local_forecasts/mos/MOSMIX_L/single_stations/L648/kml/MOSMIX_L_LATEST_L648.kmz You may run this regularly by adding the line: sudo nano /etc/crontab 16 4,10,16,22 * * * www-data ( cd /var/www/html/HVC && ./HvcWeather.py > /tmp/cron.log 2>&1 ) Dr. Arne Jachens 2020-06-22 """ def __init__(self,source): """ Select the variables, you are interested in. The KML file contains much more information then we need, let's find the relevant data... Data content specification: https://opendata.dwd.de/weather/lib/MetElementDefinition.xml TTT Temperature 2m above surface, K Td Dewpoint 2m above surface, K FX3 Maximum wind gust within the last hour, m/s RR1c Rain-Equivalent during the last 1 hours kg/m2 RRS3c Snow-Rain-Equivalent during the last 3 hours kg/m2, nur im Winter Neff Effective cloud cover, % (0..100) PPPP Surface pressure, Pa Rad1h Global Irradiance, kJ/m2 FXh55 Probability of wind gusts >= 55kn within the last 12 hours, % (0..100) SunD Yesterdays total sunshine duration, s SunD3 Sunshine duration during the last three hours, s To test the stuff, set debug = True. """ self.debug = False self.source=source self.vars = ['PPPP','TTT','Td','FX3','RR1c','Neff','Rad1h','FXh55','SunD3'] def pull(self): """ Utilize wget to retrieve the actual weather forecast. """ #utilize wget cmd = "/usr/bin/wget –no-check-certificate " + self.source myLocation = self.source[ self.source.rfind("_")+1: ] cmd = cmd + " -O " + myLocation os.system(cmd) #extract archive cmd = "/usr/bin/unzip " + myLocation os.system(cmd) #rename the file kmlfile = myLocation[:myLocation.find(".")] + ".kml" cmd = "mv MOSMIX*.kml " + kmlfile print(cmd) os.system(cmd) return kmlfile """ Download the document specifications and put them to http://localhost/HVC/ : /usr/bin/wget http://www.opengis.net/kml/2.2 -O ogckml22.xsd /usr/bin/wget https://opendata.dwd.de/weather/lib/pointforecast_dwd_extension_V1_0.xsd """ def parse(self,kmlFile): """ The KML is a specific XML format, use etree to extract the content. The variables of interest are extrected, some units changed, and finally derived humidity data is computed. """ namespaces = {'kml': 'http://www.opengis.net/kml/2.2', 'dwd': 'https://opendata.dwd.de/weather/lib/pointforecast_dwd_extension_V1_0.xsd'} #namespaces = {'kml': './ogckml22.xsd', 'dwd': './pointforecast_dwd_extension_V1_0.xsd'} if os.path.exists(kmlFile): xmlTree = ET.parse(kmlFile) xmlRoot = xmlTree.getroot() else: print (fileName+' does not exist') quit() #time steps for the prediction tag = 'kml:Document/kml:ExtendedData/dwd:ProductDefinition/dwd:ForecastTimeSteps/dwd:TimeStep' timeStepsXml = xmlRoot.findall(tag,namespaces) timeSteps = [] for ts in timeStepsXml: thisText = ts.text timeSteps.append(thisText[:thisText.find(".")]) #select all Forcast elements tag = 'kml:Document/kml:Placemark/kml:ExtendedData' ExtendedData = xmlRoot.findall(tag,namespaces) #put data sets of interest in 'data' structure data={} for v in self.vars: for ed in ExtendedData: thisXML = ed.find("./dwd:Forecast[@dwd:elementName='"+v+"']",namespaces) data[v] = thisXML.find("./dwd:value",namespaces).text.split() #split weather forecast in days an do some basic conversions lastDate="" weather = {} # for i,t in enumerate(timeSteps): for i in range(len(timeSteps)-1): t = timeSteps[i] date = t[:t.find("T")] time = t[t.find("T")+1:] time = time[:time.rfind(":")] # HH:MM if date!=lastDate: lastDate=date weather[date] = {} weather[date][time] = {} for v in self.vars: if v=="TTT" and data[v][i]!="-": TAmb = float(data[v][i])-273.15 weather[date][time]['TAmb'] = float(TAmb) # C elif v=="Td" and data[v][i]!="-": Tdp = float(data[v][i])-273.15 weather[date][time][v] = float(Tdp) # C elif v=="PPPP" and data[v][i]!="-": p = float(data[v][i])/1000.0 weather[date][time]['p'] = p # kPa elif v=="Neff" and data[v][i]!="-": weather[date][time][v] = float(data[v][i])/100.0 # 0...1 elif v=="Rad1h" and data[v][i]!="-": weather[date][time]['I'] = float(data[v][i])/3.6 # W/m2 elif v=="RR1c" and data[v][i]!="-": weather[date][time]['rain'] = float(data[v][i]) # mm/m2 elif v=="FX3" and data[v][i]!="-": weather[date][time]['windMax'] = float(data[v][i]) # m/s elif v=="Neff" and data[v][i]!="-": weather[date][time]['clouds'] = float(data[v][i]) # 0...1 elif v=="FXh55": if data[v][i]=="-": weather[date][time]['storm'] = 0.0 else: weather[date][time]['storm'] = float(data[v][i])/100.0 # 0...1 elif v=="SunD3" and data[v][i+1]!="-": weather[date][time]['sun'] = float(data[v][i+1])/3.0/3600.0 # 0...1 elif data[v][i]!="-": weather[date][time][v] = float(data[v][i]) #derived data HA = HumidAir() rhoAir = HA.densityAir(TAmb+273.15,p*1000.0) phi = HA.relHumidity(Tdp+273.15,TAmb+273.15) rhoW = HA.absHumidity(TAmb+273.15,phi) x = HA.specHumidity(rhoW,rhoAir) weather[date][time]['phi'] = phi*100.0 # 0...100% weather[date][time]['x'] = x*1000.0 # g/kg return weather def read(self,today): """ Read weather data of today, to replace updated data. """ myWeather={} myWeather[today]={} month = today[0:7] fileName = month+"/"+"weather_"+today+".log" if os.path.exists(fileName): with open(fileName,"r") as fid: lines = fid.readlines() varNames = lines[0].split() varNames[0] = varNames[0][1:] for i,l in enumerate(lines): if i>0: values = l.split() hour = values[0] myWeather[today][hour]={} for j,v in enumerate(values): if j>0: myWeather[today][hour][varNames[j]] = float(v) else: print("file "+fileName+" not found!") return myWeather def write(self,weather): """ write weather data to files, one per day """ message = "" dates = list(weather.keys()) dates.sort() try: varNames = list(weather[dates[0]]['23:00'].keys()) varNames.sort() except: print("tryed to access ",dates[0]) for i,d in enumerate(dates): if i<7: today = dates[i] todaysWeather = weather[today] msg = self.__writeTodaysWeather(today,todaysWeather,varNames) message = message + msg return message def __writeTodaysWeather(self,today,weather,varNames): message = "" month = today[0:7] #create logDirectory, if it does not exist yet try: os.mkdir(month) cmd = "chown arne:www-data "+month os.system(cmd) cmd = "chmod g+w "+month os.system(cmd) message = message + "directory "+month+" created \n" except FileExistsError: pass fileName = month+"/"+"weather_"+today+".log" with open(fileName,"w") as fid: string = " #time" for v in varNames: string = string + "\t"+v fid.write(string+"\n") myTimes = list(weather.keys()) myTimes.sort() for i,t in enumerate(myTimes): if t in weather: string = myTimes[i] for v in varNames: string = string + ('\t{0:.2f}'.format(float(weather[t][v]))) else: print(t," hour not in weather???") fid.write(string+"\n") return message def merge(self,oldWeather,newWeather): """ To keep the history of weather data, replace updated values for today. """ today = list(oldWeather.keys())[0] allHours = list(oldWeather[today].keys()) allHours.sort() varNames = list(oldWeather[today]['23:00'].keys()) varNames.sort() if self.debug: print("HvcWeather.merge ",today) print(varNames) print(allHours) for h in allHours: #take actual values if existing if h in newWeather[today]: #keep new data pass else: #conserve history newWeather[today][h] = copy.deepcopy(oldWeather[today][h]) if self.debug: print(h," conserved ",newWeather[today][h]) if self.debug: allDays = list(newWeather.keys()) print(allDays) #print("------------------") #for h in allHours: # print(h,newWeather[today][h]) return newWeather def actual(self,today,hour,minutes): """ To have actual data, interpolate hourly values """ weather = self.read(today) try: allHours = list(weather[today].keys()) varNames = list(weather[today]['23:00'].keys()) except: print("Reading actual weather failed!") print(today) print(allHours) print(weather[today]['23:00']) varNames.sort() if self.debug: print("HvcWeather.actual ",today,hour,minutes) print(varNames) print(weather[today]) #try to initialize try: lastWeather = weather[today]["00:00"] actWeather = weather[today]["00:00"] except: lastWeather = weather[today]["12:00"] actWeather = weather[today]["12:00"] hourFlt = float(hour) if hourFlt == 23.0: actWeather = weather[today]["23:00"] else: for h in allHours: nextWeather = weather[today][h] hFlt = float(h[0:h.find(":")]) if hFlt>hourFlt: w = float(minutes)/60.0 actWeather = {} for v in varNames: actWeather[v] = (1.0-w)*float(nextWeather[v]) + w*float(lastWeather[v]) break; else: lastWeather = nextWeather return actWeather class HumidAir: """ Physical properties of humid air """ def __init__(self): self.Ttr=273.16 self.pTr=611.657 self.Rair=287.06 self.Rw=461.52 def pSat(self,T): k1=17.2799 k2=4102.99 k3=237.431 theta = T - self.Ttr; p = self.pTr * exp(k1 - k2/(theta + k3)); return p def densityAir(self,T,p): rhoAir = p/self.Rair/T return rhoAir def relHumidity(self,Tdp,T): phi = self.pSat(Tdp)/self.pSat(T) return phi def absHumidity(self,T,phi): rhoW = self.pSat(T)*phi/self.Rw/T return rhoW def specHumidity(self,rhoW,rhoAir): x = rhoW/rhoAir return x if __name__ == "__main__": """ # test HumidAir HA = HumidAir() TAmb = 20+273.15 Tdp = 10+273.15 p =102.23 rhoAir = HA.densityAir(TAmb,p*1000.0) phi = HA.relHumidity(Tdp,TAmb) rhoW = HA.absHumidity(TAmb,phi) x = HA.specHumidity(rhoW,rhoAir) print('rhoAir',rhoAir) print('phi',phi) print('x',x) exit() """ source = "https://opendata.dwd.de/weather/local_forecasts/mos/MOSMIX_L/single_stations/L648/kml/MOSMIX_L_LATEST_L648.kmz" debug=0 wf = HvcWeather(source) if debug: from datetime import datetime kmlfile = "L648.kml" else: kmlfile = wf.pull() weather = wf.parse(kmlfile) allDates = list(weather.keys()) allDates.sort() if debug: print("allDates:",allDates) if debug: print("1. write weather") #wf.write(weather) today = allDates[0] else: today = allDates[0] print("merge data of ",today) todaysWeather = wf.read(today) #weather=todaysWeather try: weather = wf.merge(todaysWeather,weather) except: print("Merging of weather failed!") print("Try to recover with actual values") #print("2. write weather") print(wf.write(weather)) if debug: now = datetime.now() today = now.strftime("%Y-%m-%d") hour = now.strftime("%H") minutes = now.strftime("%M") aW = wf.actual(today,hour,minutes) print("actual weather:") print(aW)
Usage of weather data
The provided kml file contains purely the prediction data. To archive the weather data, I write a logfile per day and spend some effort to merge the new prediction with the conserved past data.
Instead of an actual sensor, I interpolate the weather to the current point in time. Additionaly I plot the weather prediction: