Source code for openaq_engine.setup_environment

#!/usr/bin/env python
"""
Setup Environment

Tools for connecting to the database.
"""

import os
from contextlib import contextmanager

import pandas as pd
import psycopg2
from sqlalchemy.engine import create_engine


[docs] def get_athena_engine(): """ Creates and returns a SQLAlchemy engine for connecting to AWS Athena. Returns ------- engine : SQLAlchemy Engine A SQLAlchemy engine connected to AWS Athena. """ conn_str = ( "awsathena+rest://{aws_access_key_id}:{aws_secret_access_key}" "@athena.{region_name}.amazonaws.com:443/" "{schema_name}?s3_staging_dir={s3_staging_dir}" ) engine = create_engine( conn_str.format( aws_access_key_id=os.getenv("AWS_ACCESS_KEY"), aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"), region_name="us-east-1", schema_name="default", s3_staging_dir="s3://openaq-pm25-historic/pm25-month/cohorts/", ) ) return engine
[docs] def get_dbengine( PGDATABASE="", PGHOST="", PGPORT=5432, PGPASSWORD="", PGUSER="", DBTYPE="postgresql", ): """ Creates and returns a SQLAlchemy engine for connecting to a PostgreSQL database. Parameters ---------- PGDATABASE : str The name of the database to connect to. PGHOST : str The hostname of the database server. PGPORT : int, optional The port number to connect to (default is 5432). PGPASSWORD : str The password for the database user. PGUSER : str The username for the database. DBTYPE : str, optional The type of database, default is "postgresql". Returns ------- engine : SQLAlchemy Engine A SQLAlchemy engine connected to the specified database. """ str_conn = "{dbtype}://{username}@{host}:{port}/{db}".format( dbtype=DBTYPE, username=os.getenv("PGUSER"), db=os.getenv("PGDATABASE"), host=os.getenv("PGHOST"), port=PGPORT, ) return create_engine(str_conn)
[docs] @contextmanager def connect_to_db(PGPORT=5432): """ Context manager for connecting to a PostgreSQL database. Parameters ---------- PGPORT : int, optional The port number to connect to (default is 5432). Yields ------ conn : SQLAlchemy Connection A connection to the PostgreSQL database. """ try: engine = get_dbengine( PGDATABASE=os.getenv("PGDATABASE"), PGHOST=os.getenv("PGHOST"), PGPORT=PGPORT, PGUSER=os.getenv("PGUSER"), PGPASSWORD=os.getenv("PGPASSWORD"), ) conn = engine.connect() yield conn except psycopg2.Error: raise SystemExit("Cannot Connect to DB") else: conn.close()
[docs] def run_query(query): """ Executes a SQL query on the database and returns the result as a pandas DataFrame. Parameters ---------- query : str The SQL query to execute. Returns ------- data : pandas DataFrame A DataFrame containing the results of the query. """ with connect_to_db() as conn: data = pd.read_sql(query, conn) return data
[docs] def test_database_connect(): """ Tests the database connection by running a simple query. Raises ------ AssertionError If the query returns fewer than 1 row. """ with connect_to_db() as conn: query = "SELECT * FROM raw.codes LIMIT 10" data = pd.read_sql_query(query, conn) assert len(data) > 1