Source code for ochanticipy.datasources.chirps.chirps

"""Class to download and load CHIRPS observational precipitation data."""
import calendar
import logging
import ssl
from abc import abstractmethod
from datetime import date, datetime
from pathlib import Path
from typing import Optional, Union
from urllib.request import urlopen

import cftime
import pandas as pd
import requests
import xarray as xr

from ochanticipy.config.countryconfig import CountryConfig
from ochanticipy.datasources.datasource import DataSource
from ochanticipy.utils.check_file_existence import check_file_existence
from ochanticipy.utils.dates import get_date_from_user_input
from ochanticipy.utils.geoboundingbox import GeoBoundingBox

logger = logging.getLogger(__name__)

_FIRST_AVAILABLE_DATE = date(year=1981, month=1, day=1)

_VALID_RESOLUTIONS = (0.05, 0.25)

_BASE_URL = "https://iridl.ldeo.columbia.edu/SOURCES/.UCSB/.CHIRPS/.v2p0/"


class _Chirps(DataSource):
    """
    Base class object to retrieve CHIRPS observational precipitation data.

    Parameters
    ----------
    country_config : CountryConfig
        Country configuration
    geo_bounding_box: GeoBoundingBox
        the bounding coordinates of the area that should be included in the
        data.
    frequency: str
        time resolution of the data to be downloaded. It can be "daily" or
        "monthly".
    resolution: float, default = 0.05
        resolution of data to be downloaded. Can be 0.05 or 0.25.
    start_date: Optional[Union[datetime.date, str]], default = None
        Data will be considered starting from date `start_date`.
        Input can be an ISO8601 string or `datetime.date` object.
        If None, it is set to 1981-1-1.
    end_date: Optional[Union[datetime.date, str]], default = None
        Data will be considered up to date `end_date`.
        Input can be an ISO8601 string or `datetime.date` object.
        If None, it is set to the date for which most recent data
        is available.
    """

    @abstractmethod
    def __init__(
        self,
        country_config: CountryConfig,
        geo_bounding_box: GeoBoundingBox,
        frequency: str,
        date_range_freq: str,
        resolution: float = 0.05,
        start_date: Optional[Union[date, str]] = None,
        end_date: Optional[Union[date, str]] = None,
    ):
        super().__init__(
            country_config=country_config,
            datasource_base_dir="chirps",
            is_public=True,
        )

        # round coordinates to correspond with the grid the CHIRPS data is on
        # non-rounded coordinates can be given to the URL which then
        # automatically rounds them, but for file saving we prefer to do
        # this ourselves
        self._geobb = geo_bounding_box.round_coords(round_val=resolution)
        self._frequency = frequency
        self._resolution = resolution
        self._date_range_freq = date_range_freq

        if start_date is None:
            start_date = _FIRST_AVAILABLE_DATE
        if end_date is None:
            end_date = self._get_last_available_date()

        self._start_date = get_date_from_user_input(start_date)
        self._end_date = get_date_from_user_input(end_date)

        self._check_dates_validity()

        if resolution not in _VALID_RESOLUTIONS:
            raise ValueError(
                f"The given resolution is {self._resolution}, which is "
                "not available. Has to be 0.05 or 0.25."
            )

    def download(  # type: ignore
        self,
        clobber: bool = False,
    ):
        """
        Download the CHIRPS observed precipitation as NetCDF file.

        Parameters
        ----------
        clobber : bool, default = False
            If True, overwrites existing raw files

        Returns
        -------
        The folder where the data is downloaded.
        """
        # Create a list of date tuples
        date_list = self._create_date_list(logging_level=logging.INFO)

        # Data download
        for d in date_list:
            last_filepath = self._download_prep(d=d, clobber=clobber)

        return last_filepath.parents[0]

    def process(self, clobber: bool = False):
        """
        Process the CHIRPS data.

        Should only be called after data has been download.

        Parameters
        ----------
        clobber : bool, default = False
            If True, overwrites existing processed files.

        Returns
        -------
        The folder where the data is processed.
        """
        # Create a list with all raw data downloaded
        filepath_list = self._get_to_be_processed_path_list()

        for filepath in filepath_list:
            try:
                ds = xr.open_dataset(filepath, decode_times=False)
            except ValueError as err:
                raise ValueError(
                    f"The dataset {filepath} is not a valid netcdf file: "
                    "something probbly went wrong during the download. "
                    "Try downloading the file again."
                ) from err
            processed_file_path = self._get_processed_path(filepath)
            processed_file_path.parent.mkdir(parents=True, exist_ok=True)
            last_filepath = self._process(
                filepath=processed_file_path, ds=ds, clobber=clobber
            )

        return last_filepath.parents[0]

    def load(self) -> xr.Dataset:
        """
        Load the CHIRPS data.

        Should only be called after the data
        has been downloaded and processed.

        Returns
        -------
        The processed CHIRPS dataset.
        """
        # Get list of filepaths of files to be loaded

        filepath_list = self._get_to_be_loaded_path_list()

        # Merge all files in one dataset
        if not filepath_list:
            raise FileNotFoundError(
                "Cannot find any netcdf file for the chosen combination "
                "of frequency, resolution and area. Make sure "
                "sure that you have already called the 'process' method."
            )

        try:
            ds = xr.open_mfdataset(
                filepath_list,
            )
            # include the names of all files that are included in the ds
            ds.attrs["included_files"] = [f.stem for f in filepath_list]
        except FileNotFoundError as err:
            raise FileNotFoundError(
                "Cannot find one or more netcdf files corresponding "
                "to the selected range. Make sure that you already "
                "downloaded and processed those data."
            ) from err

        return ds.rio.write_crs("EPSG:4326", inplace=True)

    def _download_prep(self, d, clobber):
        # Preparatory steps for the actual download
        year = str(d.year)
        month = f"{d.month:02d}"
        day = f"{d.day:02d}"
        output_filepath = self._get_raw_path(year=year, month=month, day=day)
        output_filepath.parent.mkdir(parents=True, exist_ok=True)
        url = self._get_url(year=year, month=month, day=day)
        # Actual download
        return self._download(
            filepath=output_filepath,
            url=url,
            clobber=clobber,
        )

    def _create_date_list(self, logging_level=logging.DEBUG):
        """Create list of tuples containing the range of dates of interest."""
        date_list = pd.date_range(
            self._start_date, self._end_date, freq=self._date_range_freq
        ).tolist()

        # Create a message containing information on the downloaded data
        msg = (
            f"{self._frequency.capitalize()} "
            "data will be downloaded, starting from "
            f"{date_list[0]} to {date_list[-1]}."
        )

        logging.log(logging_level, msg)

        return date_list

    def _check_dates_validity(self):
        """Check dates vailidity."""
        end_avail_date = self._get_last_available_date()

        if (
            not _FIRST_AVAILABLE_DATE
            <= self._start_date
            <= self._end_date
            <= end_avail_date
        ):
            raise ValueError(
                "Make sure that the input dates are ordered in the following "
                f"way: {_FIRST_AVAILABLE_DATE} <= {self._start_date} <= "
                f"{self._end_date} <= {end_avail_date}. The two dates above "
                "indicate the range for which CHIRPS data are "
                "currently available."
            )

    def _get_file_name_base(
        self,
        year: str,
        month: str,
    ) -> str:
        if len(month) == 1:
            month = f"0{month}"

        file_name_base = (
            f"{self._country_config.iso3}_chirps_"
            f"{self._frequency}_{year}_{month}_"
        )

        return file_name_base

    @abstractmethod
    def _get_file_name(
        self,
        year: str,
        month: str,
        day: str,
    ) -> str:
        pass

    def _get_raw_path(self, year: str, month: str, day: str) -> Path:
        return self._raw_base_dir / self._get_file_name(
            year=year, month=month, day=day
        )

    def _get_processed_path(self, raw_path: Path) -> Path:
        return self._processed_base_dir / raw_path.parts[-1]

    def _get_location_url(self):
        location_url = (
            f"X/%28{self._geobb.lon_min}%29%28{self._geobb.lon_max}"
            f"%29RANGEEDGES/"
            f"Y/%28{self._geobb.lat_max}%29%28{self._geobb.lat_min}"
            f"%29RANGEEDGES/"
        )

        return location_url

    @abstractmethod
    def _get_url(self, year: str, month: str, day: str):
        pass

    @abstractmethod
    def _get_last_available_date(self):
        pass

    def _get_to_be_processed_path_list(self):
        """Get list of filepaths of files to be processed."""
        date_list = self._create_date_list()

        filepath_list = [
            self._get_raw_path(
                year=f"{d.year}", month=f"{d.month:02d}", day=f"{d.day:02d}"
            )
            for d in date_list
        ]
        filepath_list.sort()

        return filepath_list

    def _get_to_be_loaded_path_list(self):
        """Get list of filepaths of files to be loaded."""
        date_list = self._create_date_list()

        filepath_list = [
            self._get_processed_path(
                self._get_raw_path(
                    year=f"{d.year}",
                    month=f"{d.month:02d}",
                    day=f"{d.day:02d}",
                )
            )
            for d in date_list
        ]
        filepath_list.sort()

        return filepath_list

    @staticmethod
    def _read_csv_from_url(url):
        context = ssl.create_default_context()
        context.set_ciphers("DEFAULT")
        result = urlopen(url, context=context)

        return pd.read_csv(result)

    @check_file_existence
    def _download(self, filepath: Path, url: str, clobber: bool) -> Path:
        logger.info("Downloading CHIRPS NetCDF file.")
        response = requests.get(
            url,
        )
        with open(filepath, "wb") as out_file:
            out_file.write(response.content)
        return filepath

    @check_file_existence
    def _process(self, filepath: Path, ds, clobber: bool) -> Path:
        pass


[docs] class ChirpsMonthly(_Chirps): """ Class object to retrieve CHIRPS observational monthly precipitation data. Parameters ---------- country_config : CountryConfig Country configuration geo_bounding_box: GeoBoundingBox the bounding coordinates of the area that should be included in the data. start_date: Optional[Union[datetime.date, str]], default = None Data will be considered starting from date `start_date`. Input can be an ISO8601 string or `datetime.date` object. If None, it is set to 1981-1-1. end_date: Optional[Union[datetime.date, str]], default = None Data will be considered up to date `end_date`. Input can be an ISO8601 string or `datetime.date` object. If None, it is set to the date for which most recent data is available. Examples -------- >>> from ochanticipy import create_country_config, CodAB, GeoBoundingBox >>> from ochanticipy import ChirpsMonthly >>> import datetime >>> >>> country_config = create_country_config(iso3="bfa") >>> codab = CodAB(country_config=country_config) >>> codab.download() >>> admin0 = codab.load(admin_level=0) >>> geo_bounding_box = GeoBoundingBox.from_shape(admin0) >>> >>> start_date = datetime.date(year=2007, month=10, day=23) >>> end_date = datetime.date(year=2020, month=3, day=2) >>> chirps_monthly = ChirpsMonthly( ... country_config=country_config, ... geo_bounding_box=geo_bounding_box, ... start_date=start_date, ... end_date=end_date ... ) >>> chirps_monthly.download() >>> chirps_monthly.process() >>> chirps_monthly_data = chirps_monthly.load() """ def __init__( self, country_config: CountryConfig, geo_bounding_box: GeoBoundingBox, start_date: Optional[Union[date, str]] = None, end_date: Optional[Union[date, str]] = None, ): super().__init__( country_config=country_config, geo_bounding_box=geo_bounding_box, date_range_freq="SMS", frequency="monthly", resolution=0.05, start_date=start_date, end_date=end_date, ) def _get_file_name( self, year: str, month: str, day: str, ) -> str: file_name_base = self._get_file_name_base( year=year, month=month, ) file_name = ( f"{file_name_base}" f"r{self._resolution}" f"_{self._geobb.get_filename_repr(precision=0)}.nc" ) return file_name def _get_last_available_date(self): """Get the most recent date for which data is available.""" # The url contains a table where the last date available is # specified. url = ( f"{_BASE_URL}" ".monthly/.global/" ".T/last/subgrid/0./add/T/table%3A/1/%3Atable/.csv" ) df = self._read_csv_from_url(url) datetime_object = datetime.strptime(df.values[0][0], "%b %Y") day = calendar.monthrange(datetime_object.year, datetime_object.month)[ 1 ] datetime_object = datetime_object.replace(day=day) return datetime_object.date() def _get_url(self, year: str, month: str, day: str) -> str: # Convert month from month number (in string format) to # three-letter name month_name = calendar.month_abbr[int(month)] location_url = self._get_location_url() url = ( f"{_BASE_URL}" ".monthly/.global/.precipitation/" f"{location_url}" f"T/%28{month_name}%20{year}%29%28{month_name}%20{year}" "%29RANGEEDGES/data.nc" ) return url @check_file_existence def _process(self, filepath: Path, ds, clobber: bool) -> Path: # fix dates ds.oap.correct_calendar(inplace=True) ds = xr.decode_cf(ds) if "prcp" in list(ds.keys()): ds = ds.rename({"prcp": "precipitation"}) xr.Dataset.to_netcdf(ds, path=filepath) return filepath
[docs] class ChirpsDaily(_Chirps): """ Class object to retrieve CHIRPS observational monthly precipitation data. Parameters ---------- country_config : CountryConfig Country configuration geo_bounding_box: GeoBoundingBox the bounding coordinates of the area that should be included in the data. resolution: float, default = 0.05 resolution of data to be downloaded. Can be 0.05 or 0.25. start_date: Optional[Union[datetime.date, str]], default = None Data will be considered starting from date `start_date`. Input can be an ISO8601 string or `datetime.date` object. If None, it is set to 1981-1-1. end_date: Optional[Union[datetime.date, str]], default = None Data will be considered up to date `end_date`. Input can be an ISO8601 string or `datetime.date` object. If None, it is set to the date for which most recent data is available. Examples -------- >>> from ochanticipy import create_country_config, CodAB, GeoBoundingBox, ... ChirpsDaily >>> import datetime >>> >>> country_config = create_country_config(iso3="bfa") >>> codab = CodAB(country_config=country_config) >>> codab.download() >>> admin0 = codab.load(admin_level=0) >>> geo_bounding_box = GeoBoundingBox.from_shape(admin0) >>> start_date = datetime.date(year=2007, month=10, day=23) >>> end_date = datetime.date(year=2020, month=3, day=2) >>> chirps_daily = ChirpsDaily( ... country_config=country_config, ... geo_bounding_box=geo_bounding_box, ... start_date=start_date, ... end_date=end_date ... ) >>> chirps_daily.download() >>> chirps_daily.process() >>> chirps_daily_data = chirps_daily.load() """ def __init__( self, country_config: CountryConfig, geo_bounding_box: GeoBoundingBox, resolution: float = 0.05, start_date: Optional[Union[date, str]] = None, end_date: Optional[Union[date, str]] = None, ): super().__init__( country_config=country_config, geo_bounding_box=geo_bounding_box, date_range_freq="D", frequency="daily", resolution=resolution, start_date=start_date, end_date=end_date, ) def _get_file_name( self, year: str, month: str, day: str, ) -> str: file_name_base = self._get_file_name_base( year=year, month=month, ) if len(day) == 1: day = f"0{day}" file_name = ( f"{file_name_base}" f"{day}_" f"r{self._resolution}" f"_{self._geobb.get_filename_repr(precision=0)}.nc" ) return file_name def _get_last_available_date(self): """Get the most recent date for which data is available.""" # The url contains a table where the last date available is # specified. url = ( f"{_BASE_URL}" ".daily-improved/.global/" f"{str(self._resolution).replace('.', 'p')}/" ".T/last/subgrid/0./add/T/table%3A/1/%3Atable/.csv" ) df = self._read_csv_from_url(url) datetime_object = datetime.strptime(df.values[0][0], "%d %b %Y") return datetime_object.date() def _get_url(self, year: str, month: str, day: str) -> str: # Convert month from month number (in string format) to # three-letter name month_name = calendar.month_abbr[int(month)] location_url = self._get_location_url() url = ( f"{_BASE_URL}" ".daily-improved/.global/." f"{str(self._resolution).replace('.', 'p')}/.prcp/" f"{location_url}" f"T/%28{day}%20{month_name}%20{year}%29%28{day}" f"%20{month_name}%20{year}" "%29RANGEEDGES/data.nc" ) return url @check_file_existence def _process(self, filepath: Path, ds, clobber: bool) -> Path: # fix dates ds = ds.assign_coords( T=cftime.datetime.fromordinal( ds.T.values, calendar="standard", has_year_zero=False ) ) ds.oap.correct_calendar(inplace=True) if "prcp" in list(ds.keys()): ds = ds.rename({"prcp": "precipitation"}) xr.Dataset.to_netcdf(ds, path=filepath) return filepath