Source code for pcse.fileinput.cabo_weather

# -*- coding: utf-8 -*-
# Copyright (c) 2004-2014 Alterra, Wageningen-UR
# Allard de Wit (allard.dewit@wur.nl), April 2014
import os, sys
import glob
import calendar
import numpy as np
import datetime as dt
import warnings

from ..base_classes import WeatherDataContainer, WeatherDataProvider
from ..util import reference_ET, angstrom, check_angstromAB
from ..exceptions import PCSEError

[docs]class CABOWeatherDataProvider(WeatherDataProvider): """Reader for CABO weather files. :param fname: root name of CABO weather files to read :param fpath: path where to find files, can be absolute or relative. :keyword ETmodel: "PM"|"P" for selecting penman-monteith or Penman method for reference evapotranspiration. Defaults to "PM". :keyword distance: maximum interpolation distance for meteorological variables, defaults to 1 day. :returns: callable like object with meteo records keyed on date. The Wageningen crop models that are written in FORTRAN or FST often use the CABO weather system (http://edepot.wur.nl/43010) for storing and reading weather data. This class implements a reader for the CABO weather files and also implements additional features like interpolation of weather data in case of missing values, conversion of sunshine duration to global radiation estimates and calculation the reference evapotranspiration values for water, soil and plants (E0, ES0, ET0) using the Penman approach. A difference with the old CABOWE system is that the python implementation will read and store all files (e.g. years) available for a certain station instead of loading a new file when crossing a year boundary. .. note:: some conversions are done by the CABOWeaterDataProvider from the units in the CABO weather file for compatibility with WOFOST: - vapour pressure from kPa to hPa - radiation from kJ/m2/day to J/m2/day - rain from mm/day to cm/day. - all evaporation/transpiration rates are also returned in cm/day. *Example* The file 'nl1.003' provides weather data for the year 2003 for the station in Wageningen and can be found in the cabowe/ folder of the WOFOST model distribution. This file can be read using:: >>> weather_data = CABOWeatherDataProvider('nl1', fpath="./meteo/cabowe") >>> print weather_data(datetime.date(2003,7,26)) Weather data for 2003-07-26 (DAY) IRRAD: 12701000.00 J/m2/day TMIN: 15.90 Celsius TMAX: 23.00 Celsius VAP: 16.50 hPa WIND: 3.00 m/sec RAIN: 0.12 cm/day E0: 0.36 cm/day ES0: 0.32 cm/day ET0: 0.31 cm/day Latitude (LAT): 51.97 degr. Longitude (LON): 5.67 degr. Elevation (ELEV): 7.0 m. Alternatively the date in the print command above can be specified as string with format YYYYMMDD or YYYYDDD. """ # Order, name and conversion factor of weather variables variables = [("IRRAD",1e3, "J/m2/day"),("TMIN",1,"Celsius"), ("TMAX",1,"Celsius"),("VAP",10,"hPa"), ("WIND",1,"m/sec"), ("RAIN",0.1,"cm/day")] # Radiation in Kj/m2/day or Sunshine duration has_sunshine = False # Status line and observation no data value status_no_data = -999. weather_no_data = -99. # start and end year firstyear = None lastyear = None # First date when CABO files start first_date = None # temporary array for storing data potential_records = None tmp_data = None def __init__(self, fname, fpath=None, ETmodel="PM", distance=1): WeatherDataProvider.__init__(self) self.ETmodel = ETmodel # Construct search path search_path = self._construct_search_path(fname, fpath) # find available files CABOWE_files, available_years, cache_file = self._find_CABOWEfiles(search_path) # If no cache file can be found, then start loading the CABOWE files if not self._load_cache_file(cache_file, CABOWE_files): self.tmp_data = self._calc_arraysize() # Run through files, read header and location parameters. # Then read meteo data into tmp_data array prev_cb_file = None for yr, cb_file in zip(available_years, CABOWE_files): header, loc_par, records = self._read_file(cb_file) # header info is taken from the first CABOWE file if self.description is None: self.description = header self._set_location_parameters(loc_par, cb_file, prev_cb_file) for rec in records: if len(rec.strip()) == 0: continue if rec.startswith("-999"): # Status line without values continue self._proc_weather_record(rec, yr) prev_cb_file = cb_file # convert sunshine duration to global radiation self._check_angstrom() # Run interpolation, given interpolation distance (default is 1 day) self._interpolate_timeseries(distance) self._make_WeatherDataContainers() # Write data to binary cache file self._write_cache_file(search_path) # Delete array for tmp storage delattr(self, "tmp_data") def _load_cache_file(self, cache_file, CABOWE_files): """Load the weather data from a binary file using cPickle. Also checks if any of the CABOWE files have modification/creation date more recent then the cache_file. In that case reload the weather data from the original CABOWE files. Returns True if loading succeeded, False otherwise """ # If no cache_file defined return False directly if cache_file is None: return False # if date of any CABOWE files > cache file: discard cache file and return False to reload from original files. # retrieved last modification dates of CABOWE files cb_dates = [] for cb_file in CABOWE_files: r = os.stat(cb_file) cb_dates.append(r.st_mtime) # retrieve modification dates of cache file cache_date = os.stat(cache_file).st_mtime if any([cbd > cache_date for cbd in cb_dates]): try: os.remove(cache_file) except OSError as exc: msg = "Failed to remove cache file '%s' due to: %s" % (cache_file, exc) warnings.warn(msg) return False else: # Else load data from cache file and store internally try: self._load(cache_file) return True except Exception as e: msg = "Cache file failed loading! Try to delete cache file: %s" self.logger.warn(msg, cache_file) return False def _write_cache_file(self, search_path): """Write the data loaded from the CABOWE files to a binary file using cPickle """ cache_fname = search_path + ".cache" self._dump(cache_fname) def _construct_search_path(self, fname, fpath): """Construct the path where to look for files""" if fpath is None: # assume CABOWE files in current folder p = os.path.join(os.getcwd(), fname) elif os.path.isabs(fpath): # absolute path specified p = os.path.join(fpath, fname) else: # assume path relative to current folder p = os.path.join(os.getcwd(), fpath, fname) return os.path.normpath(p) def _proc_weather_record(self, rec, fileyr): """Processes record and inserts values into correct place in array""" values = rec.split() try: year = int(values[1]) doy = int(values[2]) weather_obs = np.array(values[3:], dtype=np.float64) except (ValueError,IndexError) as exc: msg = ("Failed to parse line: %s" % rec) raise RuntimeError(msg) # Check if file contents are consistent with file year if year != fileyr: msg = "File with year %s contains record for year %s" raise PCSEError(msg % (fileyr, year)) # Calculate position in tmp_array base on date since first date rec_date = dt.date(year, 1, 1) + dt.timedelta(days=(doy-1)) arraypos = (rec_date - self.first_date).days # insert data at right position in array self.tmp_data[:, arraypos] = weather_obs def _interpolate_timeseries(self, distance=1): """Interpolates gaps using linear interpolation, except for rainfall. distance specifies the interpolation distance: distance=1 will only allow interpolation when the previous and the following day are available, distance=2 allows also when 2 days are missing, etc. Defaults to 1. """ # Kernel for interpolation distance kernel = np.ones((1 + distance*2)) # array for tracing missing values in the self.tmp_data has_data = np.ones_like(self.tmp_data) # find indivual missing observations which are equal to self.weather_no_data (e..g -99.), # set them to np.NaN index = np.where(self.tmp_data == self.weather_no_data) has_data[index] = 0 self.tmp_data[index] = np.NaN # Find missing lines in the CABOWE files, these have not been inserted in tmp_data and # are therefore np.NaN (tmp_data was initialized with np.NaN) index = np.where(np.isnan(self.tmp_data) == True ) has_data[index] = 0 for i, (var, cf, unit) in enumerate(self.variables): if var == "RAIN": # No interpolation on rainfall data continue timeseries_hasdata = has_data[i,:].flatten() if timeseries_hasdata.sum() == timeseries_hasdata.size: # No missing values continue timeseries = self.tmp_data[i,:].flatten() r = np.convolve(timeseries_hasdata, kernel, mode='same') # Find positions for interpolation: hasdata==0 and >=2 neighbours # except the first and last record index = (timeseries_hasdata==0)*(r>=2) index[0] = False index[-1] = False if True not in index: # No positions that can be interpolated continue # Determine positions and y values for interpolation (x, xp, yp) xrange = np.arange(self.potential_records, dtype=np.float) x = xrange[index] xp = xrange[(timeseries_hasdata == 1)] yp = timeseries[(timeseries_hasdata == 1)] y_int = np.interp(x,xp,yp) # put interpolated values back into tmp_data self.tmp_data[i, x.astype(np.int)] = y_int def _make_WeatherDataContainers(self): """Converts the data in self.tmp_data into WeatherDataContainers which are stored in the class dictionary keyed on the date. Records that are incomplete (contain np.NaN values) are skipped. Moreover, if the radiation measurements are in sunshine duration, than the Angstrom equation is used to estimate global radiation. Finally, the evapotranspiration value are calculated for each complete record. """ # Generate prototype weather data container #wdc_proto = self._build_WeatherDataContainer() for i in range(self.potential_records): rec = self.tmp_data[:, i] if True in np.isnan(rec): # Incomplete record: skip continue # Derive date from position in array thisdate = self.first_date + dt.timedelta(days=i) t = {"DAY": thisdate, "LAT": self.latitude, "LON": self.longitude, "ELEV": self.elevation} for obs, (name, cf, unit) in zip(rec, self.variables): if name == "IRRAD" and self.has_sunshine is True: obs = angstrom(thisdate, self.latitude, obs, self.angstA, self.angstB) # angstrom routine returns in J/m2/day, no conversion factor needed t[name] = float(obs) else: t[name] = float(obs)*cf # Reference evapotranspiration in mm/day try: E0, ES0, ET0 = reference_ET(thisdate, t["LAT"], t["ELEV"], t["TMIN"], t["TMAX"], t["IRRAD"], t["VAP"], t["WIND"], self.angstA, self.angstB, self.ETmodel) except ValueError as e: msg = (("Failed to calculate reference ET values on %s. " % thisdate) + ("With input values:\n %s.\n" % str(t)) + ("Due to error: %s" % e)) raise PCSEError(msg) # update record with ET values value convert to cm/day t.update({"E0": E0/10., "ES0": ES0/10., "ET0": ET0/10.}) # Build weather data container from dict 't' wdc = WeatherDataContainer(**t) # add wdc to dictionary for thisdate self._store_WeatherDataContainer(wdc, thisdate) def _calc_arraysize(self): """Returns array of NaNs with size based on min/max year of data.""" self.potential_records = 0 for yr in range(self.firstyear, self.lastyear+1): if calendar.isleap(yr): self.potential_records += 366 else: self.potential_records += 365 t_ar = np.empty((6,self.potential_records), dtype=np.float64) t_ar[:] = np.NaN return t_ar def _set_location_parameters(self, line, cb_file, prev_cb_file): """Parse, check and assign location parameters. """ strvalues = line.split() if len(strvalues) != 5: msg = "Did not find 5 values on location parameter line of file %s" raise PCSEError(msg % file) parnames = ["longitude", "latitude", "elevation", "angstA", "angstB"] for parname, strvalue in zip(parnames, strvalues): try: fvalue = float(strvalue) current_value = getattr(self, parname) if current_value is None: setattr(self, parname, fvalue) else: if abs(current_value - fvalue) > 0.001: raise AttributeError except ValueError as e: msg = "Failed to parse location parameter %s on file %s, value: %s" raise PCSEError(msg % (parname, cb_file, strvalue)) except AttributeError as e: msg = "Inconsistent '%s' location parameter in file %s compared to file %s." raise PCSEError(msg % (parname, cb_file, prev_cb_file)) def _check_angstrom(self): """Checks the Angstrom parameters for consistency. Also sets self.has_sunshine=True when both A and B > 0. """ if self.angstA > 0 and self.angstB > 0: self.has_sunshine=True self.angstA = abs(self.angstA) self.angstB = abs(self.angstB) check_angstromAB(self.angstA, self.angstB) def _read_file(self, fname): with open(fname) as fp: lines = fp.readlines() header = [] location_par = None records = [] for line in lines: l = line.strip() if l.startswith("*"): header.append(l) else: if location_par is None: location_par = l else: records.append(l) return (header, location_par, records) def _find_CABOWEfiles(self, search_path): """Find CABOWEfiles on given path with given name. Also sorts the list, checks for missing years and sets self.firstyear/lastyear and self.first_date. """ cachefile = search_path + ".cache" if not os.path.exists(cachefile): cachefile = None CABOWEfiles = sorted(glob.glob(search_path+".[0-9][0-9][0-9]")) if len(CABOWEfiles) == 0: path, tail = os.path.split(search_path+".???") msg = "No CABO Weather files found when searching for '%s' at %s" raise PCSEError(msg % (tail, path)) available_years = [] for Cfile in CABOWEfiles: path, ext = os.path.splitext(Cfile) ext = ext[1:] if ext.startswith("9"): weather_year = 1000 + int(ext) else: weather_year = 2000 + int(ext) available_years.append(weather_year) self.firstyear = min(available_years) self.lastyear = max(available_years) self.first_date = dt.date(self.firstyear, 1, 1) # Check if years are missing and if so write a warning to the log file all_years = set(range(self.firstyear, self.lastyear+1)) diff = all_years.difference(set(available_years)) if len(diff) > 0: msg = "No CABOWE files found for year(s): %s" % list(diff) warnings.warn(msg) return CABOWEfiles, available_years, cachefile