OpenAQ

Download OpenAQ data and convert to Pandas DataFrame

1. Download the data

Using download.py as below, data in .ndjson.gz format for a given date range (e.g. 1st Jan 2015 to 31st Dec 2015) using:

python download.py 2015-01-01 2015-12-31
#!/usr/bin/env python3
"""
Download script for all OpenAQ data
Credit to https://github.com/barronh/scrapenaq
Downloads all (.ndjson.gz) files between dates
Example:
    python download.py 2020-01-01 2020-12-31
"""

import pandas as pd
import urllib.request
import re
import os
import argparse

parser = argparse.ArgumentParser()
parser.add_argument('startdate', help='Find ndjson.gz created >= startdate')
parser.add_argument('enddate', help='Find ndjson.gz created <= enddate')

args = parser.parse_args()

dates = pd.date_range(args.startdate, args.enddate)
keyre = re.compile('<Key>(.+?)</Key>')
BROOT = 'openaq-fetches.s3.amazonaws.com/'

for date in dates:
    xrpath = (
        'https://{}?delimiter=%2F&'.format(BROOT) +
        'prefix=realtime-gzipped%2F{}%2F'.format(date.strftime('%F'))
    )
    xmlpath = BROOT + date.strftime('realtime-gzipped/%F.xml')
    zippedpath = BROOT + date.strftime('realtime-gzipped/%F')
    
    os.makedirs(zippedpath, exist_ok=True)

    if os.path.exists(xmlpath):
        print('Keeping cached', xmlpath)
    else:
        urllib.request.urlretrieve(xrpath, xmlpath)

    xmltxt = open(xmlpath, mode='r').read()
    keys = keyre.findall(xmltxt)
    
    for key in keys:
        url = 'https://' + BROOT + key
        outpath = BROOT + key
        if os.path.exists(outpath):
            print('Keeping cached', outpath)
        else:
            urllib.request.urlretrieve(url, outpath)

2. Convert to a Pandas DataFrame

Using convert.py below, convert the .ndjson.gz data to a Pandas DataFrame for a given year (e.g. 2015) using:

python convert.py 2015
#!/usr/bin/env python3
"""
Convert the default format for OpenAQ data (.ndjson.gz) to Pandas DataFrame for a given year
"""

import glob
import gzip
import ndjson
from pandas.io.json import json_normalize
import pandas as pd
import argparse

parser = argparse.ArgumentParser()
parser.add_argument('year')
args = parser.parse_args()

year = args.year

path = '~' # change to this where the data is
files = sorted(glob.glob('openaq-fetches.s3.amazonaws.com/realtime-gzipped/' + year + '*/*'))

df_list = []
for file in files:
    with gzip.open(file, 'rb') as ds:
        data_ndjson = ds.read()

    data_json = ndjson.loads(data_ndjson)

    df_list.append(json_normalize(data_json))
    
df = pd.concat(df_list, sort=False)

df.set_index('date.utc', inplace=True)
df.index = pd.to_datetime(df.index)

df.to_csv(path + 'openaq_data_' + year + '.csv')

To analyse the OpenAQ data

Option 1: eagerly load into memory using Pandas

import pandas as pd
df = pd.read_csv(
    '/nfs/b0004/Users/earlacoa/openaq/shared/openaq_data_2013-2020_india.csv', 
    parse_dates=['date.utc'],
    usecols=['date.utc', 'parameter', 'value', 'unit', 'coordinates.latitude', 'coordinates.longitude', 'city', 'country'],
    index_col='date.utc'
)
df['2020-02-01':'2020-04-30']

Option 2: lazily load using Dask with parquet files

import dask.dataframe as dd
from dask.distributed import Client
client = Client()
client
df = dd.read_parquet(
    '/nfs/b0004/Users/earlacoa/openaq/shared/openaq_data_2013-2020_india.parquet',
    columns=['parameter', 'value', 'unit', 'coordinates.latitude', 'coordinates.longitude', 'city', 'country']
)

To compute tasks use the .compute() method

client.close()

Convert a dataset to the OpenAQ format

import glob
import pandas as pd
import xarray as xr
import numpy as np
df_openaq = pd.read_csv(
    '/nfs/b0122/Users/earlacoa/openaq/csv/openaq_data_2015_noduplicates.csv',
    index_col="date.utc",
    parse_dates=True
)
df_obs_summaries = {'2014': [], '2015': [], '2016': [], '2017': [], '2018': [], '2019': [], '2020': []}
china_obs_files = glob.glob('/nfs/a68/earlacoa/china_measurements_corrected/*nc')
parameters = {'CO': 'co', 'NO2': 'no2', 'O3': 'o3', 'PM10': 'pm10', 'PM2.5': 'pm25', 'SO2': 'so2'}
years = ['2014', '2015', '2016', '2017', '2018', '2019', '2020']
for china_obs_file in china_obs_files:
    ds_obs = xr.open_dataset(china_obs_file)
    
    for parameter in parameters.keys():
        dict_obs = {
            'date.utc': ds_obs.time.values,
            'city': ds_obs.city,
            'unit': 'µg/m³',
            'value': ds_obs[parameter].values,
            'country': 'CN',
            'location': ds_obs.name,
            'parameter': parameters[parameter],
            'sourceName': 'China measurements',
            'sourceType': 'government',
            'date.local': ds_obs.time.values + np.timedelta64(8, 'h'),
            'coordinates.latitude': ds_obs.lat,
            'coordinates.longitude': ds_obs.lon, 
            'averagingPeriod.unit': 'hours',
            'averagingPeriod.value': 1
        }
        df_obs = pd.DataFrame.from_dict(dict_obs)
        df_obs.set_index('date.utc', inplace=True)
        
        for year in years:
            df_obs_summaries[year].append(df_obs[year])
        
    ds_obs.close()
for year in years:
    df_obs_summaries_concat = pd.concat(df_obs_summaries[year])
    df_obs_summaries_concat.to_csv(f'/nfs/a68/earlacoa/china_measurements_corrected/df_obs_summary_{year}.csv')