Map/Reduce on the Enron dataset

We are going to use EMR on the Enron email dataset: http://aws.amazon.com/datasets/enron-email-data/

https://en.wikipedia.org/wiki/Enron_scandal

This dataset contains 1,227,255 eMails from Enron employees. The version we use consists of 50 GB of compressed files.


Consider the following scenario: Sept 9, 2001 (really), the The New York Times ran an article titled "MARKET WATCH; A Self-Inflicted Wound Aggravates Angst Over Enron" (http://www.webcitation.org/5tZ2mRM4U). Someone (your boss?) wants to find out who frequently talked to the press in the days before. You are handed a dump of the email server.

Technically, this task consists of the following steps:

-  Put the dataset into S3 (the ugly part, already done for you)

-  Extract the date/sender/recipient from the email data (this is what is described in detail below)

-  Filter the data to

-  only consider emails between 2001-09-05 and 2001-09-08.

-  only consider messages going from ENRON employees to someone not part of the organization

-  Count the number of foreign interactions and only include accounts that have more than one outside contact that week.

To achieve this, you need to create a set of MapReduce jobs. We are going to implement those in Python, using the Hadoop streaming feature also available in EMR.

http://hadoop.apache.org/docs/r1.2.1/streaming.html

If you are new to Python, check out http://www.afterhoursprogramming.com/tutorial/Python/Introduction/

Here is an example of using Python and Hadoop Streaming on EMR

https://dbaumgartel.wordpress.com/2014/04/10/an-elastic-mapreduce-streaming-example-with-python-and-ngrams-on-aws/ (not all details relevant here).

In Hadoop Streaming, Python MapReduce programs are given a part of the input data on the standard system input (stdin) and are expected to write tab-separated tables on the standard output (stdout). Here is a working skeleton for a map or reduce function:

#!/usr/bin/env python

import sys

for line in sys.stdin:

line = line.strip().split('\t')

# do something with line[0], line[1] etc.

print("Some_Key\tSome Payload")

The reducer counterpart starts very similar, but has one important difference: All the values with the same Key from the mapper will follow each other, which allows them to be combined.

First, let's start a (small) cluster. Log into AWS at https://console.aws.amazon.com/elasticmapreduce/home?region=us-east-1

We are going to start with a small cluster.

This time, the simple configuration is fine, and most of the defaults can stay the way they are.

Some version numbers might be higher/newer than in below screenshot(s), but that should be fine.

For hardware configuration, we are going to start with a 2-node cluster of m1.large instances

Wait until your cluster has started ("Waiting").

While you are waiting, we need to create a S3 bucket for the output of our analysis.

Click "Create Bucket"

Select the "US Standard" region and a name for your bucket. This name needs to be globally unique for S3. Then click "Create".

Back to EMR:

First, we are going to run the data transformation on a small part of the dataset. On your (now hopefully ready soon) cluster page, select "Steps", then "Add Step".

Select Step Type "Streaming Program" and give it a name.

Further, set Mapper to

s3://enron-scripts/enron-etl.py

Reducer to

cat

Input S3 Location to

s3://enron-scripts/enron-urls-small.txt

And output S3 location to

s3://enron-results/t1

(replace enron-results with the S3 bucket name you just created.)

Then click "Add".

You will see the MapReduce job starting, going from "Pending", to "Running" and then hopefully to "Completed". If something goes wrong, inspect the log files! If all went well, it is time to inspect the results in S3.

Right-click the file “part-00000” and download it to inspect its contents. You will find three columns, separated by tab (\t) character, containing (1) a timestamp, (2) a sender email address, and (3) a recipient email address, respectively. In other words, the “enron-etl.py” you just ran extracted from the raw data exactly the information required for the analysis described above, i.e., for your task.

2001-04-18T02:58:00Z

2001-04-18T02:58:00Z

2001-04-18T02:58:00Z

2001-04-18T16:11:00Z

2001-04-18T16:11:00Z

2001-04-19T03:38:00Z

2001-04-19T03:38:00Z

2001-04-19T03:38:00Z

2001-04-19T03:38:00Z

In fact, AWS/EMR/Hadoop might choose to use more than one reducer (check the “View jobs” and “View tasks as described below for details), and then the result will be distributed over more than one file. In my latest test, AWS/EMR/Hadoop used two reducers for my job, resulting in two files, i.e., “part-00000” & “part-00001”.

This file (or these files) will be the input for your next MapReduce job as described above.

(Tip: If you specify as “Input S3 Location” to not a file (as “s3://enron-scripts/enron-urls-small.txt” in the above example) but a directory (folder), e.g., the “s3://enron-results/t1” result folder you used above, AWS/EMR/Hadoop will automatically iterate of all files in the directory (folder), i.e., you do not need to concatenate them yourself in any way.)

Create a mapper.py and a reducer.py script, upload them to your S3 bucket, point to them in the Step "Streaming" step creation and run them.

See the skeleton further up for an example. The Mapper is expected to output a key and values separated by a tab (\t) character. As mentioned in the slides, the Mapper typically filters records and outputs them with the common key, and the reducers read the files with the common key and output an aggregation.

Here are examples for Hadoop Streaming Mappers and Reducers doing Wordcount (text files are available at

http://homepages.cwi.nl/~manegold/UvA-ABS-MBA-BDBA-BDIT-2017 /Wordcount-Mapper.py

and

http://homepages.cwi.nl/~manegold/UvA-ABS-MBA-BDBA-BDIT-2017/Wordcount-Reducer.py):

Mapper

#!/usr/bin/env python
import sys
# line-wise iterate over standard input (stdin)
for line in sys.stdin:
# split line (after stripping off any leading/trailing whitespace)
# on whitespaces into "words"
words = line.strip().split()
# iterate over all words of a line
for word in words:
# print word (after stripping off any leading/trailing
# whitespace) as key and number "1" as value
# as tab-('\t')-separated (key,value) pair
print(word.strip()+ "\t1")


Reducer

#!/usr/bin/env python
import sys
# initialize variables
current_count = 0
current_word = ""
# line-wise iterate over standard input (stdin)
# (recall, each line is expected to consist of a tab-('\t')-separated
# (key,value) pair)
for line in sys.stdin:
# split line (after stripping off any leading/trailing whitespace)
# on tab ('\t') into key & value
line = line.strip().split('\t')
# sanity check: did we indeed get exactly two parts (key & value)?
# if not, skip line and continue with next line
if len(line) != 2:
continue
# extract key
key = line[0]
# new (next) key
# (recall, keys are expected to arrive in sorted order)
if (key != current_word):
if (current_count > 0):
# print previous key and aggregated count
# as tab-('\t')-separated (key,value) pair
print(current_word + '\t' + str(current_count))
# reset counter to 0 and recall new key
current_count = 0
current_word = key
# increment count by 1
current_count += 1
if (current_count > 0):
# print last key and its aggregated count
# as tab-('\t')-separated (key,value) pair
print(current_word + '\t' + str(current_count))

If anything goes wrong (which is likely in the beginning), you should inspect the log files provided for the EMR Step. It could take a few minutes for them to appear in the Web interface. Also check the logs for failing tasks! Finally, make sure each job's output directory does not exist yet in S3, otherwise the job will fail.

For local (i.e., on your laptop) prototyping of your Map() and Reduce() scripts, follow the instructions on the course website, replacing “kjv.txt” with the “part-00000“ / “part-00001“ created and downloaded above.

Larger Dataset

To run the ETL (and your subsequent job) on the larger dataset, create a step as follows:

Select Step Type "Custom JAR" and give it a name.

Set JAR location to

command-runner.jar

Set Arguments to

hadoop-streaming -Dmapred.map.tasks=100 -files s3://enron-scripts/enron-etl.py -mapper enron-etl.py -reducer cat -input s3://enron-scripts/enron-urls.txt -output s3://enron-results/f1

(replace enron-results with the S3 bucket name you just created.)

Note: This is a normal Hadoop streaming job, too, but for complicated reasons we need to set a custom MapReduce parameter.

NOTE: This is essentially the same as the first ETL (extract, transform, load) job as above, but now for the large/entire dataset rather than only a small subset. Thus, it generates the same three-column, tab-(\t)-separated, result containing (1) a timestamp, (2) a sender email address, and (3) a recipient email address, respectively, but now in s3://.../f1/ rather than s3://.../t1/ . Hence, for the assignment, you need to run your own Map() & Reduce() jobs on the result of this large ETL job, just as you ran the word-count example above on the result of the small ETL job, but now using s3://.../f1/ as input rather than s3://.../t1/ .

Then click "Add".

After the Step has started, inspect its Mapper tasks:

Scroll down to inspect the large number of Mapper tasks. In the current state, your cluster will take a long time to finish all those. But since this is the cloud, we can simply request more firepower: On your cluster details page, select "Resize"

Increase the "Core Instance Group" to a count of 5 like so:

Once the additional nodes are available, the Step will process much faster. After it has been completed, run your MapReduce job on the larger results.

Once finished, again make sure to shut down your EMR cluster!

ETL Script for reference (plain text file available at http://homepages.cwi.nl/~manegold/UvA-ABS-MBA-BDBA-BDIT-2017/enron-etl.py):

#!/usr/bin/env python

# this turns enron email archive into tuples (date, from, to)

import sys

import zipfile

import tempfile

import email

import time

import datetime

import os

import urllib

# stdin is list of URLs to data files

for u in sys.stdin:

u = u.strip()

if not u:

continue

tmpf = tempfile.mkstemp()

urllib.urlretrieve(u, tmpf[1])

try:

zip = zipfile.ZipFile(tmpf[1], 'r')

except:

continue

txtf = [i for i in zip.infolist() if i.filename.endswith('.txt')]

for f in txtf:

msg = email.message_from_file(zip.open(f))

tostr = msg.get("To")

fromstr = msg.get("From")

datestr = msg.get("Date")

if (tostr is None or fromstr is None or datestr is None):

continue

toaddrs = [email.utils.parseaddr(a) for a in tostr.split(',')]

fromaddr = email.utils.parseaddr(fromstr)[1].replace('\'','').strip().lower()

try: # datetime hell, convert custom time zone stuff to UTC

dt = datetime.datetime.strptime(datestr[:25].strip(), '%a, %d %b %Y %H:%M:%S')

dt = dt + datetime.timedelta(hours = int(datestr[25:].strip()[:3]))

except ValueError:

continue

if not '@' in fromaddr or '/' in fromaddr:

continue

for a in toaddrs:

if (not '@' in a[1] or '/' in a[1]):

continue

ta = a[1].replace('\'','').strip().lower()

print dt.isoformat() + 'Z\t' + fromaddr + '\t' + ta

zip.close()

os.remove(tmpf[1])