As a lifelong Utahan, I began to wonder how bad is the pollution? The news reporters seem to think it’s pretty bad. The politicians say it’s never been better. So how bad is it? What impact does it have on things like real estate value? How many people are impacted?
As we continue our series analyzing Utah’s air quality with Randy Zwitch, Senior Developer Advocate at MapD, we now turn our focus on to cleaning the data that we received from the EPA’s Air Quality API. In addition we will cover how we use the data to calculate an Air Quality Index (AQI) score and exporting the data out for import into MapD which we will use to further analyze the data.
Analyzing Utah’s Air Quality
- Part 1: Connecting to the EPA’s AQS Data API
- Part 2: Cleaning and Transforming AQS Data
- Part 3: Using Shapefiles and Assigning AQI Stations in MapD
- Part 4: Building the Utah AQI Dashboard in MapD
- Part 5: Final Analysis: Air Quality Findings
When it comes to data analysis, most new analysts tend to jump right into making calculations and designing data visualizations, it’s kind of like a kid on Christmas, they just want to start ripping into the data visualization but with data analysis a lot of upfront investigation, planning, and cleanup has to happen first in order to arrive at valid conclusions. We now have a set of raw data from the EPA, the next steps is to become more familiar with the data, plan how we are going to use data based on our desired outputs of the project, and do some general cleanup to make our analysis easier.
General Cleanup of the Data
After we have completed the process outlined in Part 1 of this series, we are left with a Pandas Dataframe that contains the raw output of our query using the EPA Air Quality API. The Dataframe contains all of the information we need to calculate a daily air quality index for an entire year, by county, for the state of Utah, however we need to do some basic cleanup before we move on to calculating the overall air quality index.
#Remove 'END OF FILE' entries df = df[df['Latitude'] != 'END OF FILE'] #Remove all but 1-hour observations df = df[df['Sample Duration'] == '1 HOUR'] #Force codes to int df['County Code'] = df['County Code'].astype(int) df['Site Num'] = df['Site Num'].astype(int) #Sorting df = df.sort_values(['County Code', 'Site Num', 'Parameter Code', 'Date Local', '24 Hour Local'])
Remove ‘END OF FILE’ entries
After we have populated a Dataframe, with API query results, one of the very first things we like to do is examine the contents to get a better feel for how the Dataframe looks. We can do this easily using the head and tail function within Pandas.
#Examine the last 5 rows of a Dataframe df.tail()
In viewing the last few rows of data, we see that the API response injected an “END OF FILE” entry that we will want to remove from our dataset, again using Pandas this can be easily removed.
#Remove 'END OF FILE' entries df = df[df['Latitude'] != 'END OF FILE']
Remove Non-1 Hour Observations
In reviewing the requirements for calculating a daily air quality index, we discovered a fairly complex, rolling average, was to be used. In order to simplify the processing, we made a decision to only evaluate data from air quality reporting stations that submitted a report based on an “hourly” observation.
So let’s first examine what our data looks like to see if this approach is even feasible. In looking through the raw data, we find a column labeled ‘Sample Duration’, we assume this holds the timeframe used to collected the air quality sample. We can further observe the contents of this column by returning the unique values and associated counts for this column.
#Return unique values and associated counts df['Sample Duration'].value_counts()
Let’s further cleanup our Dataframe by removing any observations that were not collected using a ‘1 Hour’ observation method:
#Remove all but 1-hour observations df = df[df['Sample Duration'] == '1 HOUR']
Sort the Data
In examining our data, there are three geographic levels that are of interest to us (State, County, Data Collection Site) and in order to further refine our processing of the data, we will want to sort our raw data by a geographic hierarchy. We have already limited our query to the state of Utah, so we won’t need to do anything special to rollup the data to a state level however, within the state results we have data for each county and within each county we could have multiple sites that were used to collect the data.
Additionally, we will want to sort by pollutent, date, and hour, this will become a very important step later as we begin to calculate a rolling average, which is a requirment of the EPA when calculating a Daily Air Quality Index score.
In order to better evaluate and process the data, let’s sort the data into our desired order.
#Force codes to int for sorting df['County Code'] = df['County Code'].astype(int) df['Site Num'] = df['Site Num'].astype(int) #Sorting df = df.sort_values(['County Code', 'Site Num', 'Parameter Code', 'Date Local', '24 Hour Local'])
In English what this is doing is organizing our data into a sorted data set, at the high level we have the State of Utah as this is what our API query was constrained on, then in our sort function we are grouping on County and then within each county we sort on data collection location, within each data collection location we sort on each type of pollutant that is measured, then date, then hour of day.
An example structure, after the data is sorted, would look like this:
├── Utah
└── Salt Lake County
└── Site 101
└── Ozone
└── January 1, 2016
└── 10:00am
└── Sample Measurment
Calculate Daily Air Quality Index Score for Each County/Pollutant
Are we having any fun? Let’s have some fun!
The pollution data we have is measured every hour, the EPA requires an 8 hour rolling average in order to calculate a daily Air Quality Index Score. In more general terms, when you see the news report things like “Today is a RED day, please carpool.” That “RED” is a color code, mapped to a peak measurement for a given day.
Perhaps an example will help give some clarity here before we jump into the code.
Let’s say we are measuring ozone and have the following data:
From the above hourly data, a rolling 8 hour average is calculated for each hour of the day. The max value for the day, in this example “80”, is then used in the AQI Formula (more on that later) to calculate the AQI for the day, in this case the Air Quality Index, or AQI, is 133. A 133 AQI may be reported as “Moderate” or “Yellow” (again, more on that later).
Now let’s breakdown the code that we use to calculate a Daily AQI.
# Create a combined dataframe which concats each of the processing dataframes (df_activePollutant) for each loop through county/site/pollutant dfCombined = pd.DataFrame() # Get a list of County Codes based on current dataset activeCounties = df['County Code'].unique() # Loop through each unique county code, create a processing df, process AQI calculations i = 0 while i < len(activeCounties): #print("Processing Active County", activeCounties[i]) # Return a data frame for the active county being processed df_activeCounty = df.loc[(df['County Code'] == activeCounties[i])] # From the active county, get an array of sites to process activeSites = df_activeCounty['Site Num'].unique() # Loop through each site, within the active county j = 0 while j < len(activeSites): print("Processing Active Site", activeSites[j]) # Call calc_aqi for each pollutant # PARAMS: # primary data frame (df) # active processing county # active processing site # Pollutant Code # Number of decimal places for pollutant code # Number of hours needed for rolling average # Pollutant lower breakpoint dictionary # Pollutant uppder breakpoint dictionary #Ozone 44201 print("NOW PROCESSING OZONE") df_activePollutant = calc_aqi(df,activeCounties[i],activeSites[j],44201,3,8,ozone_breaks_lower,ozone_breaks_upper) dfCombined = pd.concat([dfCombined,df_activePollutant]) print("NOW PROCESSING PM2.5") #PM2.5 88101 df_activePollutant = calc_aqi(df,activeCounties[i],activeSites[j],88101,1,24,pm25_breaks_lower,pm25_breaks_upper) dfCombined = pd.concat([dfCombined,df_activePollutant]) print("NOW PROCESSING PM10") #PM10 81102 df_activePollutant = calc_aqi(df,activeCounties[i],activeSites[j],81102,1,24,pm10_breaks_lower,pm10_breaks_upper) dfCombined = pd.concat([dfCombined,df_activePollutant]) print("NOW PROCESSING CO") #CO 42101 df_activePollutant = calc_aqi(df,activeCounties[i],activeSites[j],42101,1,8,co_breaks_lower,co_breaks_upper) dfCombined = pd.concat([dfCombined,df_activePollutant]) print("NOW PROCESSING SO2") #SO2 42401 df_activePollutant = calc_aqi(df,activeCounties[i],activeSites[j],42401,1,1,so2_breaks_lower,so2_breaks_upper) dfCombined = pd.concat([dfCombined,df_activePollutant]) print("NOW PROCESSING NO2") #NO2 42602 df_activePollutant = calc_aqi(df,activeCounties[i],activeSites[j],42602,1,1,so2_breaks_lower,so2_breaks_upper) dfCombined = pd.concat([dfCombined,df_activePollutant]) j += 1 i += 1
This first thing we will want to do is to create a new Dataframe that will be used to store all of our calculations.
# Create a combined dataframe which concats each of the processing dataframes (df_activePollutant) for each loop through county/site/pollutant dfCombined = pd.DataFrame()
If you remember from our data structure, Counties can have multiple Data Collection Sites, and Sites can have multiple Pollutants that are measured. So we will need to build some kind of structure to traverse through that data in a structured way.
Let's start with getting the number of counties for the given state.
# Get a list of County Codes based on current dataset activeCounties = df['County Code'].unique()
In our example, Utah has 14 counties that supply data, so we will need to loop through each of those counties to start.
# Loop through each unique county code, create a processing df, process AQI calculations i = 0 while i < len(activeCounties):
Next, each county can have multiple sites used to collect air quality data so we will need to get a list of sites within each county.
# Return a data frame for the active county being processed df_activeCounty = df.loc[(df['County Code'] == activeCounties[i])] # From the active county, get an array of sites to process activeSites = df_activeCounty['Site Num'].unique() # Loop through each site, within the active county j = 0 while j < len(activeSites):
Now that we have identified a specific site, each site can gather data about multiple pollutants, so we will need to evaluate each potential pollutant for each site.
#Ozone 44201 print("NOW PROCESSING OZONE") df_activePollutant = calc_aqi(df,activeCounties[i],activeSites[j],44201,3,8,ozone_breaks_lower,ozone_breaks_upper) dfCombined = pd.concat([dfCombined,df_activePollutant]) print("NOW PROCESSING PM2.5") #PM2.5 88101 df_activePollutant = calc_aqi(df,activeCounties[i],activeSites[j],88101,1,24,pm25_breaks_lower,pm25_breaks_upper) dfCombined = pd.concat([dfCombined,df_activePollutant]) print("NOW PROCESSING PM10") #PM10 81102 df_activePollutant = calc_aqi(df,activeCounties[i],activeSites[j],81102,1,24,pm10_breaks_lower,pm10_breaks_upper) dfCombined = pd.concat([dfCombined,df_activePollutant]) print("NOW PROCESSING CO") #CO 42101 df_activePollutant = calc_aqi(df,activeCounties[i],activeSites[j],42101,1,8,co_breaks_lower,co_breaks_upper) dfCombined = pd.concat([dfCombined,df_activePollutant]) print("NOW PROCESSING SO2") #SO2 42401 df_activePollutant = calc_aqi(df,activeCounties[i],activeSites[j],42401,1,1,so2_breaks_lower,so2_breaks_upper) dfCombined = pd.concat([dfCombined,df_activePollutant]) print("NOW PROCESSING NO2") #NO2 42602 df_activePollutant = calc_aqi(df,activeCounties[i],activeSites[j],42602,1,1,so2_breaks_lower,so2_breaks_upper) dfCombined = pd.concat([dfCombined,df_activePollutant])
Then we move on to the next collection site.
j += 1
Then on to the next county.
i += 1
And then we do it all over again until we have processed data for every collection site, in every county in the state that reports air quality data.
If you review our processing code, we condensed most of the calculations into a single function call.
df_activePollutant = calc_aqi(df,activeCounties[i],activeSites[j],88101,1,24,pm25_breaks_lower,pm25_breaks_upper)
We need to unpack what is happening here as this is where the actual AQI Index is calculated.
Let's break down the calc_aqi function:
The calc_aqi function takes the following parameters:
- df - Our master Dataframe
- activeCounties[i] - The active county currently being processed
- activeSites[j] - The active data collection site currently being processed
- pollutant code - The active pollutant code currently being processed
- decimal rounding - The EPA publishes a requirement on how many decimal places to round for each pollutant
- rolling average - The EPA publishes a required rolling hourly average 1 hour, 8 hour, or 24 hour, for each pollutant
- lower break points - Used to bucket index score
- upper break points - Used to bucket index score
Average Requirements
Ozone = 8-hour
PM2.5 = 24-hour
PM10 = 24-hour
CO = 8-hour
SO2 = 1-hour
NO2 = 1-hour
Decimal truncation
Ozone = 3 places
PM2.5 = 1 place
PM10 = Integer
CO = 1 place
SO2 = integer
NO2 = integer
The AQI Function returns the Air Quality Index Score for the active County-Site-Pollutant being processed.
#AQI Calculation Function def calc_aqi(df,county,site,param_code,param_rnd,avg_unit,breaks_lower,breaks_upper): df_activePollutant = df.loc[(df['County Code'] == activeCounties[i]) & (df['Site Num'] == activeSites[j]) & (df['Parameter Code'] == param_code)] #Reverse the order of the data frame so Pandas rolling can use the "look back" window #Use Pandas rolling to calculate 8 hour average df_activePollutant = df_activePollutant[::-1] df_activePollutant['Rolling Avg'] = df_activePollutant['Sample Measurement'].rolling(avg_unit, min_periods=avg_unit).mean() df_activePollutant['Rolling Avg'] = df_activePollutant['Rolling Avg'].round(param_rnd) df_activePollutant = df_activePollutant[pd.notnull(df_activePollutant['Rolling Avg'])] # Calculate AQI for each measurment observation in the current processing data frame for k, row in df_activePollutant.iterrows(): aqiAvg = row["Rolling Avg"] if np.isnan(aqiAvg) == False: aqiLow = get_range(breaks_lower, aqiAvg) aqiHigh = get_range(breaks_upper, aqiAvg) if aqiLow is not None: breakRange = [key for key in breaks_lower.items() if key[1] == aqiLow][0][0] breakRangeLow = breakRange[0] breakRangeHigh = breakRange[1] rowAqi = (aqiHigh - aqiLow)/(breakRangeHigh - breakRangeLow)*(aqiAvg - breakRangeLow) + aqiLow rowAqi = int(rowAqi) df_activePollutant.set_value(k,'AQI',rowAqi) #print(aqiAvg, aqiLow, rowAqi) return df_activePollutant
Here is the math behind rowAqi:
You will notice the above function makes use of a high and low range, those are defined here:
# Define AQI Lower and Upper Bound Lookup Tables #Ozone ozone_breaks_lower = {(0.000, 0.054): 0, (0.055, 0.070): 51, (0.071, 0.085): 101, (0.086, 0.105): 151, (0.106, 0.200): 201, (0.405, 0.504): 301, (0.505, 0.604): 401} ozone_breaks_upper = {(0.000, 0.054): 50, (0.055, 0.070): 100, (0.071, 0.085): 150, (0.086, 0.105): 200, (0.106, 0.200): 300, (0.405, 0.504): 400, (0.505, 0.604): 500} #PM2.5 pm25_breaks_lower = {(0.0, 12.0): 0, (12.1, 35.4): 51, (35.5, 55.4): 101, (55.5, 150.4): 151, (150.5, 250.4): 201, (250.5, 350.4): 301, (350.5, 500.4): 401} pm25_breaks_upper = {(0.0, 12.0): 50, (12.1, 35.4): 100, (35.5, 55.4): 150, (55.5, 150.4): 200, (150.5, 250.4): 300, (250.5, 350.4): 400, (350.5, 500.4): 500} #PM10 pm10_breaks_lower = {(0.0, 54): 0, (55, 154): 51, (155, 254): 101, (255, 354): 151, (355, 424): 201, (425, 504): 301, (505, 604): 401} pm10_breaks_upper = {(0.0, 54): 50, (55, 154): 100, (155, 254): 150, (255, 354): 200, (355, 424): 300, (425, 504): 400, (505, 604): 500} #C0 co_breaks_lower = {(0.0, 4.4): 0, (4.5, 9.4): 51, (9.5, 12.4): 101, (12.5, 15.4): 151, (15.5, 30.4): 201, (30.5, 40.4): 301, (40.5, 50.4): 401} co_breaks_upper = {(0.0, 4.4): 50, (4.5, 9.4): 100, (9.5, 12.4): 150, (12.5, 15.4): 200, (15.5, 30.4): 300, (30.5, 40.4): 400, (40.5, 50.4): 500} #SO2 so2_breaks_lower = {(0.0, 35): 0, (36, 75): 51, (76, 185): 101, (186, 304): 151, (305, 604): 201, (605, 804): 301, (805, 1004): 401} so2_breaks_upper = {(0.0, 35): 50, (36, 75): 100, (76, 185): 150, (186, 304): 200, (305, 604): 300, (605, 804): 400, (805, 1004): 500} #NO2 no2_breaks_lower = {(0.0, 53): 0, (54, 100): 51, (101, 360): 101, (361, 649): 151, (650, 1249): 201, (1250, 1649): 301, (1650, 2049): 401} no2_breaks_upper = {(0.0, 53): 50, (54, 100): 100, (101, 360): 150, (361, 649): 200, (650, 1249): 300, (1250, 1649): 400, (1650, 2049): 500}
And....finally we have a helper function used to get the range:
# Function to return lower and upper AQI ranges def get_range(table, measurement): for key in table: if key[0] <= measurement <= key[1]: return table[key]
Create Daily Summary and Export
If you are still with me, we have now processed all of our data and calculated a rolling AQI average for each pollutant measured but the data is still a bit too unstructured for our analysis so we will do some final summations before we move our work into MapD.
Our final step will be to create a "daily summary" and export it to CSV:
# Create a daily summary dfDaily = dfCombined.groupby(['County Code', 'Site Num', 'Latitude','Longitude', 'AQS Parameter Desc', 'Date Local'], sort=False)['AQI'].max().reset_index() aqi_desc = {(0, 50): "Good", (51, 100): "Moderate", (101, 150): "Unhealthy for Sensitive Groups", (151, 200): "Unhealthy", (201, 300): "Very Unhealthy", (301, 500): "Hazardous"} aqi_color = {(0, 50): "Green", (51, 100): "Yellow", (101, 150): "Orange", (151, 200): "Red", (201, 300): "Purple", (301, 500): "Maroon"} for l, row in dfDaily.iterrows(): aqi = row["AQI"] aqiDesc = get_range(aqi_desc, aqi) aqiColor = get_range(aqi_color, aqi) dfDaily.set_value(l,'AQI Description',aqiDesc) dfDaily.set_value(l,'AQI Color',aqiColor) outputName = '_out/' + year + '_daily.csv' dfDaily.to_csv(outputName, sep=',', index=False)
Let's start by creating a new Dataframe by grouping our data by County, Site, Lat, Lon, Pollutant, and Date. Let's also use this as an opportunity to select the maximum AQI measurement for each day.
# Create a daily summary dfDaily = dfCombined.groupby(['County Code', 'Site Num', 'Latitude','Longitude', 'AQS Parameter Desc', 'Date Local'], sort=False)['AQI'].max().reset_index()
While we have a daily index score, let's map it to something that is easier to understand, for that we will use the defined measures as provided by the EPA.
aqi_desc = {(0, 50): "Good", (51, 100): "Moderate", (101, 150): "Unhealthy for Sensitive Groups", (151, 200): "Unhealthy", (201, 300): "Very Unhealthy", (301, 500): "Hazardous"} aqi_color = {(0, 50): "Green", (51, 100): "Yellow", (101, 150): "Orange", (151, 200): "Red", (201, 300): "Purple", (301, 500): "Maroon"}
Now we can iterate over each row in our Dataframe and assign a description (Good, Moderate, etc.) and a color (Green, Yellow, etc.) for each day and pollutant measured.
for l, row in dfDaily.iterrows(): aqi = row["AQI"] aqiDesc = get_range(aqi_desc, aqi) aqiColor = get_range(aqi_color, aqi) dfDaily.set_value(l,'AQI Description',aqiDesc) dfDaily.set_value(l,'AQI Color',aqiColor)
And finally, we will output our summarized data into a CSV file that we will import into MapD.
outputName = '_out/' + year + '_daily.csv' dfDaily.to_csv(outputName, sep=',', index=False)
That was a heavy, heavy post, thank you for staying with us. Next we will shift our work into MapD where we will be using Shapefiles and assigning AQI Stations to create an eye catching visualization.
About Randy Zwitch and MapD
[one_half]
Randy Zwitch is an accomplished Data Science and Analytics professional with broad industry experience in big data and data science. An open-source contributor in the R and Julia programming language communities with specialties Big Data, Data Visualization, Relational Databases, Predictive Modeling and Machine Learning, Data Engineering.
Randy is the Senior Developer Advocate at MapD and is a highly sought after speaker and lecturer.
[/one_half][one_half_last]
MapD is an extreme analytics platform, used in business and government to find insights in data beyond the limits of mainstream analytics tools. The MapD platform delivers zero-latency querying and visual exploration of big data, dramatically accelerating operational analytics; data science; and geospatial analytics.
Originating from research at MIT, MapD is a technology breakthrough, harnessing the massive parallel computing of GPUs for data analytics. MapD Technologies Inc. is headquartered in San Francisco, and the platform is available globally via open source and enterprise license options.
[/one_half_last]
[author] Jason Thompson [author_image timthumb='on']http://i2.wp.com/33sticks.com/wp-content/uploads/2015/02/jason_250x250.jpg?zoom=2&w=1080[/author_image] [author_info]Jason is the co-founder and CEO of 33 Sticks. In addition to being an amateur chef and bass player, he can also eat large amounts of sushi. As an analytics and optimization expert, he brings over 15 years of data experience, going back to being part of the original team at Omniture. [/author_info] [/author]
Leave a comment