MIT License. Copyright (c) 2016 Jeffrey Alan Meyers. See LICENSE.md
This project contains a command line utility to log tweets from the Twitter Streaming API and load them into Amazon S3 Buckets. You can also download Geofabrik OpenStreetMap extracts, filter out Point-of-Interest nodes and load them data S3 Buckets. The OSM and Twitter log files in S3 are used as input for Elastic MapReduce jobs.
Example output
after processing 2GB of tweets and the entire USA OSM extract with the
poi_nearby_osm.py
MapReduce job. (metro area, number of tweets nearby, point-of-interest)
Dependencies
apt-get update
apt-get install python-dev libgeos-dev libspatialindex-dev \
build-essential protobuf-compiler libprotobuf-dev
# if you don't already have pip
curl https://bootstrap.pypa.io/get-pip.py | pythonInstall geotweet command line utility
pip install geotweetInstalling this package will provide you with a python executable named geotweet.
geotweet stream|load|osm [options]
geotweet stream --help # store Twitter Streaming API output to log files
geotweet load --help # load log files to S3 bucket
geotweet osm --help # download osm extracts from geofabrik
# extract POI nodes and load into S3 bucketStore geograhpic tweets from Twitter Streaming API into --log-dir
usage: geotweet stream [-h] [--log-dir LOG_DIR] [--log-interval LOG_INTERVAL]
[--bbox BBOX]
optional arguments:
-h, --help show this help message and exit
--log-dir LOG_DIR Path to log file directory
--log-interval LOG_INTERVAL
Minutes in each log file
--bbox BBOX Bounding Box as 'SW,NE' using 'Lon,Lat' for each
point.
Listen for archived files in --log-dir and upload to S3 bucket
usage: geotweet load [-h] [--log-dir LOG_DIR] [--bucket BUCKET]
[--region REGION]
optional arguments:
-h, --help show this help message and exit
--log-dir LOG_DIR Path to log file directory
--bucket BUCKET AWS S3 Bucket name
--region REGION AWS S3 Region such as 'us-west-2'
Download OSM extracts from GeoFabrik, extract POI nodes and load to S3 Bucket
usage: geotweet osm [-h] [--output OUTPUT] [--states STATES] [--bucket BUCKET]
[--region REGION]
optional arguments:
-h, --help show this help message and exit
--output OUTPUT Location of output files (default=/tmp)
--states STATES File containing list of states to download and load
--bucket BUCKET AWS S3 Bucket name
--region REGION AWS S3 Region such as 'us-west-2'
For geotweet stream the following environment variables must be set.
See example_conf/stream-envvars.sh for all options.
TWITTER_CONSUMER_KEYTWITTER_CONSUMER_SECRETTWITTER_ACCESS_TOKEN_KEYTWITTER_ACCESS_TOKEN_SECRET
For geotweet load|osm
See example_conf/load-envvars.sh for all options.
AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEYAWS_BUCKET(if not provided as cli param)AWS_DEFAULT_REGION(if not provided as cli param)
pip install geotweet
# Twitter
export TWITTER_CONSUMER_KEY="..."
export TWITTER_CONSUMER_SECRET="..."
export TWITTER_ACCESS_TOKEN_KEY="..."
export TWITTER_ACCESS_TOKEN_SECRET="..."
# AWS
export AWS_ACCESS_KEY_ID="..."
export AWS_SECRET_ACCESS_KEY="..."
# start streaming to log files rotate log file every 5 minutes
geotweet stream --log-dir /tmp/geotweet --log-interval 5
# Open and new terminal
# start loading archived log rotations to S3
geotweet load --log-dir /tmp/geotweet --bucket already.created.bucket --region us-west-2
# To download and load OSM data to S3
echo -e "Oregon\nWashington" > states.txt
geotweet osm --bucket already.created.bucket --region us-west-2 --states states.txtTo run as daemon on Ubuntu with Upstart copy
example_conf/geotweet-stream.conf and example_conf/geotweet-load.conf
to /etc/init and set the environment variables in those files then run:
sudo service geotweet-stream start
sudo service geotweet-load startRun geotweet stream
Python script running on a cheap VPS (DigitalOcean) will connect to the Twitter Streaming API and filter for tweets inside Continental US.
For each tweet (if Lat-Lon coordinates are provided), extract fields, marshal as JSON and append to a log file. The log files are rotated every 60 minutes.
Example of log entry (1 line with pretty print)
{
"source" : "<a href=\"http://www.tweet3po.org\" rel=\"nofollow\">Tweet3po</a>",
"followers_count" : 959,
"screen_name" : "Tweet3po",
"tweet_id" : "712897292534087681",
"friends_count" : 5,
"location" : "Orlando, FL",
"timestamp" : "1458802934188",
"text" : "#HouseBusinessCheck 1750 Mills Ave N 32803 (3/24 02:45) #Orlando #LakeFormosa",
"created_at" : "Thu Mar 24 07:02:14 +0000 2016",
"user_id" : 56266341,
"description" : "Hyper-Local Neighborhood News.",
"name" : "Tweet3po",
"lonlat" : [
-81.36450067,
28.56774084
]
}
Run geotweet load
Listen for log file rotations. Each archived file will be uploaded into an Amazon S3 Bucket.
After log files have been collected for long enough run some MapReduce jobs
git clone https://github.com/meyersj/geotweet.git
cd geotweet
virtualenv env
./env/bin/pip install -r requirements.txt # plus required system packagesRun MapReduce jobs
cd /path/to/geotweet/binMapReduce Word Count from tweets broken down by US, State and County (cached spatial lookup using Shapely/Rtree)
./mrjob_runner state-county-wordsMapReduce Word Count of tweets broken down by
Metro areas (cached spatial lookup using Shapely/Rtree).
Final results will be persisted to MongoDB if GEOTWEET_MONGODB_URI
is set to a valid uri.
export GEOTWEET_MONGODB_URI="mongodb://127.0.0.1:27017"
./mrjob_runner metro-wordsOutput stored in MongoDB db=geotweet as collection=metro_word as documents
{
metro_area: "Portland, OR--WA",
word: "beautiful",
count: 142
}
Input is log of geographic tweets and points-of-interest extracted from OSM. For each input record look up metro area and emit data using metro as key
In reduce for each metro area, build index of points-of-interest and do spatial search for nearby POI's for each tweet and emit count for each nearby POI.
Final results will be persisted to MongoDB if GEOTWEET_MONGODB_URI
is set to a valid uri.
export GEOTWEET_MONGODB_URI="mongodb://127.0.0.1:27017"
./mrjob_runner poi-nearbyOutput stored in MongoDB db=geotweet as collection=metro_osm as documents
{
metro_area: "Portland, OR--WA",
poi: "Stumptown",
count: 124
}
Run EMR Job
First build a package that will be used to bootstrap the hadoop nodes
cd /path/to/repo/geotweet
# build and store package in $PWD/dist
python setup.py sdist Set all of the required config parameters, set all paths
cp example_conf/mrjob.conf ~/.mrjob.conf
vim ~/.mrjob.conf Configure the job with correct input and output buckets
cd /path/to/geotweet/bin
vim emrjob_runner # make sure you set the `src` and `dst` S3 buckets
./emrjob_runner state-county-words
./emrjob_runner metro-words
./emrjob_runner poi-nearbyTests available to run after cloning and installing dependencies.
nosetests geotweet/tests/unit/*
# requires environment variables specified above to be set
# and MongoDB instance running locally
nosetests geotweet/tests/integration/* To build a local virtual machine with MongoDB you need virtualbox/vagrant
installed and a ubuntu/trusty64 box
git clone https://github.com/meyersj/geotweet.git
cd geotweet
vagrant box add ubuntu/trusty64
vagrant up
vagrant ssh
cd /vagrant/bin
# run mapreduce job
./mrjob_runner poi-nearby
# connect to mongo shell and query results
mongo geotweet
db.metro_osm.find({
metro_area: "Portland, OR--WA"
}).sort({count:-1})