Source code for openaq_engine.src.preprocessing.filter

from typing import List

import pandas as pd


[docs] class Filter:
[docs] @staticmethod def filter_pollutant( df: pd.DataFrame, pollutant_to_predict: str ) -> pd.DataFrame: """ Filters the DataFrame for rows containing the specified pollutant. Parameters ---------- df : pd.DataFrame The DataFrame containing air quality data. pollutant_to_predict : str The pollutant to filter for. Returns ------- pd.DataFrame The filtered DataFrame containing only rows with the specified pollutant. """ return ( df.assign( selected_pollutant=( df.parameter.apply( lambda pollutant: pollutant_to_predict in str(pollutant) ) ) ) .query("selected_pollutant == True") .drop(["selected_pollutant"], axis=1) )
[docs] @staticmethod def filter_no_coordinates(df: pd.DataFrame) -> pd.DataFrame: """ Filters the DataFrame to remove rows with empty coordinates. Parameters ---------- df : pd.DataFrame The DataFrame containing coordinate data. Returns ------- pd.DataFrame The filtered DataFrame with rows that have empty coordinates removed. """ return ( df.assign( no_coords=( df.coordinates.apply(lambda coords: str(coords) == "{}") ) ) .query("no_coords == False") .drop(["no_coords"], axis=1) )
[docs] @staticmethod def filter_non_null_values(df: pd.DataFrame) -> pd.DataFrame: """ Filters the DataFrame to remove rows with non-positive values. Parameters ---------- df : pd.DataFrame The DataFrame containing air quality data. Returns ------- pd.DataFrame The filtered DataFrame with rows containing non-positive values removed. """ return ( df.assign( non_null_values=( df.value.apply(lambda pm25_value: float(pm25_value) >= 0) ) ) .query("non_null_values == True") .drop(["non_null_values"], axis=1) )
[docs] @staticmethod def filter_extreme_values(df: pd.DataFrame) -> pd.DataFrame: """ Filters the DataFrame to remove rows with extreme PM2.5 values. Parameters ---------- df : pd.DataFrame The DataFrame containing air quality data. Returns ------- pd.DataFrame The filtered DataFrame with rows containing extreme PM2.5 values removed. """ return ( df.assign( non_extreme_values=( df.value.apply(lambda pm25_value: float(pm25_value) <= 500) ) ) .query("non_extreme_values == True") .drop(["non_extreme_values"], axis=1) )
[docs] @staticmethod def filter_countries( df: pd.DataFrame, countries: List[str] ) -> pd.DataFrame: """ Filters the DataFrame for specific countries. Parameters ---------- df : pd.DataFrame The DataFrame containing location data. countries : list of str The list of countries to filter for. Returns ------- pd.DataFrame The filtered DataFrame containing only rows from the specified countries. """ def check_country(country_str): # If the country_str looks like a list, parse it accordingly if country_str.startswith("[") and country_str.endswith("]"): country_list = country_str.strip("[]").split(", ") else: country_list = [country_str] print(f"Parsed countries: {country_list}") for str_ in countries: print(f"Checking if '{str_}' is in {country_list}") if str_ in country_list: return True return False df["filtered_country"] = df.country.apply(check_country) print( f"After applying country filter:\n{df[['country', 'filtered_country']]}" ) return df.query("filtered_country == True").drop( ["filtered_country"], axis=1 )
[docs] @staticmethod def filter_cities(df: pd.DataFrame, cities: List[str]) -> pd.DataFrame: """ Filters the DataFrame for specific cities. Parameters ---------- df : pd.DataFrame The DataFrame containing location data. cities : list of str The list of cities to filter for. Returns ------- pd.DataFrame The filtered DataFrame containing only rows from the specified cities. """ def check_city(city_str): # If the city_str looks like a list, parse it accordingly if city_str.startswith("[") and city_str.endswith("]"): city_list = city_str.strip("[]").split(", ") else: city_list = [city_str] print(f"Parsed cities: {city_list}") for str_ in cities: print(f"Checking if '{str_}' is in {city_list}") if str_ in city_list: return True return False df["filtered_cities"] = df.city.apply(check_city) print( f"After applying city filter:\n{df[['city', 'filtered_cities']]}" ) return df.query("filtered_cities == True").drop( ["filtered_cities"], axis=1 )