import json
import time
from typing import Any, List
import boto3
import numpy as np
import pandas as pd
import requests
from pydantic.json import pydantic_encoder
from setup_environment import connect_to_db
[docs]
def read_csv(path: str, **kwargs: Any) -> pd.DataFrame:
"""
Read csv ensuring that nan's are not parsed
"""
return pd.read_csv(
path,
sep=",",
low_memory=False,
encoding="utf-8",
na_filter=False,
**kwargs,
)
[docs]
def write_csv(df: pd.DataFrame, path: str, **kwargs: Any) -> None:
"""
Write csv to provided path ensuring that the correct encoding and escape
characters are applied.
Needed when csv's have text with html tags in it and lists inside cells.
"""
df.to_csv(
path,
index=False,
na_rep="",
sep=",",
lineterminator="\n",
encoding="utf-8",
escapechar="\r",
**kwargs,
)
[docs]
def query_results_from_api(headers, url):
response = requests.get(url, headers=headers)
return response
[docs]
def api_response_to_df(url):
headers = {"accept": "application/json"}
response = query_results_from_api(headers, url)
try:
# Directly use response.json() without json.loads
return pd.DataFrame(response.json()["results"])
except KeyError:
pass
[docs]
def query_results_from_aws(params, query, wait=True):
session = boto3.Session()
client = session.client("athena", params["region"])
response_query_execution_id = client.start_query_execution(
QueryString=query,
QueryExecutionContext={"Database": "default"},
ResultConfiguration={
"OutputLocation": f"s3://{params['bucket']}/{params['path']}/"
},
)
if not wait:
return response_query_execution_id["QueryExecutionId"]
else:
response_get_query_details = client.get_query_execution(
QueryExecutionId=response_query_execution_id["QueryExecutionId"]
)
status = "RUNNING"
iterations = 360000 # 30 mins
while iterations > 0:
iterations = iterations - 1
response_get_query_details = client.get_query_execution(
QueryExecutionId=response_query_execution_id[
"QueryExecutionId"
]
)
status = response_get_query_details["QueryExecution"]["Status"][
"State"
]
if (status == "FAILED") or (status == "CANCELLED"):
failure_reason = response_get_query_details["QueryExecution"][
"Status"
]["StateChangeReason"]
print(failure_reason)
return False, False
elif status == "SUCCEEDED":
# Function to get output results
response_query_result = client.get_query_results(
QueryExecutionId=response_query_execution_id[
"QueryExecutionId"
]
)
return response_query_result
else:
time.sleep(0.001)
return False
[docs]
def get_s3_file_path_list(resource, bucket, folder):
csv_filetype = ".csv"
my_bucket = resource.Bucket(bucket)
csv_list = []
for object_summary in my_bucket.objects.filter(Prefix=f"{folder}"):
if object_summary.key.endswith(csv_filetype):
csv_list.append(object_summary.key)
return csv_list
[docs]
def write_dataclass(dclass: object, path: str) -> None:
"""
Write a dataclass to the provided path as a json
"""
with open(path, "w+") as f:
f.write(
json.dumps(
dclass, indent=4, ensure_ascii=True, default=pydantic_encoder
)
)
[docs]
def get_categorical_feature_indices(df: pd.DataFrame) -> List[int]:
return list(np.where(df.dtypes == "category")[0])
[docs]
def json_provider(file_path, cmd_name):
with open(file_path) as config_data:
return json.load(config_data)
[docs]
def parametrized(dec):
def layer(*args, **kwargs):
def repl(f):
return dec(f, *args, **kwargs)
return repl
return layer
[docs]
def get_data(query):
"""
Pulls data from the db based on the query.
Parameters
----------
query : str
SQL query from the database
Returns
-------
pd.DataFrame
Dump of Query into a DataFrame
"""
with connect_to_db() as conn:
df = pd.read_sql_query(query, conn)
return df
[docs]
def write_to_db(
df,
engine,
table_name,
schema_name,
table_behaviour,
index=False,
**kwargs,
):
# with engine.begin() as connection:
# connection.execute(text("""SET ROLE "pakistan-ihhn-role" """))
df.to_sql(
name=table_name,
schema=schema_name,
con=engine,
if_exists=table_behaviour,
index=index,
**kwargs,
)
[docs]
def ee_array_to_df(arr, list_of_bands):
"""Transforms client-side ee.Image.getRegion array to pandas.DataFrame."""
df = pd.DataFrame(arr)
# Rearrange the header.
headers = df.iloc[0].tolist() # Ensure headers are in list format
df = pd.DataFrame(df.values[1:], columns=headers)
# Remove rows without data inside.
df = df[["longitude", "latitude", "time", *list_of_bands]].dropna()
# Convert the data to numeric values.
df["longitude"] = pd.to_numeric(df["longitude"], errors="coerce")
df["latitude"] = pd.to_numeric(df["latitude"], errors="coerce")
df["time"] = pd.to_numeric(df["time"], errors="coerce")
for band in list_of_bands:
df[band] = pd.to_numeric(df[band], errors="coerce")
# Convert the time field into a datetime.
df["datetime"] = pd.to_datetime(df["time"], unit="ms")
# Keep the columns of interest.
df = df[["longitude", "latitude", "time", "datetime", *list_of_bands]]
return df.reset_index(drop=True)