PySpark - Word Count Example


PySpark - Word Count

In this PySpark Word Count Example, we will learn how to count the occurrences of unique words in a text line. Of course, we will learn the Map-Reduce, the basic step to learn big data.

Python Program

import sys
 
from pyspark import SparkContext, SparkConf
 
if __name__ == "__main__":
	
	# create Spark context with necessary configuration
	sc = SparkContext("local","PySpark Word Count Exmaple")
	
	# read data from text file and split each line into words
	words = sc.textFile("D:/workspace/spark/input.txt").flatMap(lambda line: line.split(" "))
	
	# count the occurrence of each word
	wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda a,b:a +b)
	
	# save the counts to output
	wordCounts.saveAsTextFile("D:/workspace/spark/output/")

Run this Python Spark Application.

> spark-submit pyspark_example.py

If the application runs without any error, an output folder should be created at the output path specified D:/workspace/spark/output/.

If you try to run the application again, you may get an error in the console output as shown below.

Output

py4j.protocol.Py4JJavaError: An error occurred while calling o44.saveAsTextFile.
: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory file:/D:/workspace/spark/output already exists
        at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:131)
        at org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil.assertConf(SparkHadoopWriter.scala:287)
        at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:71)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1096)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1094)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1094)

This is because, during our first run, the output folder is created. Before you try it again, you need to explicitly delete the output folder.


Analyse the Input and Output of PySpark Word Count

Let us analyse the input and output of this Example.

We have provided the following data in the input text file.

Python Lists allow us to hold items of heterogeneous types. In this article, we will learn how to create a list in Python; access the list items; find the number of items in the list, how to add an item to list; how to remove an item from the list; loop through list items; sorting a list, reversing a list; and many more transformation and aggregation actions on Python Lists.

In the output folder, you would notice the following list of files.

Open part-00000. The contents would be as shown below:

('Python', 2)
('Lists', 1)
('allow', 1)
('us', 1)
('to', 5)
('hold', 1)
('items', 2)
('of', 2)
('heterogeneous', 1)
('types.', 1)
('In', 1)
('this', 1)
('article,', 1)
('we', 1)
('will', 1)
('learn', 1)
('how', 3)
('create', 1)
('a', 3)
('list', 3)
('in', 2)
('Python;', 1)
('access', 1)
('the', 4)
('items;', 2)
('find', 1)
('number', 1)
('list,', 2)
('add', 1)
('an', 2)
('item', 2)
('list;', 3)
('remove', 1)
('from', 1)
('loop', 1)
('through', 1)
('sorting', 1)
('reversing', 1)
('and', 2)
('many', 1)
('more', 1)
('transformation', 1)
('aggregation', 1)
('actions', 1)
('on', 1)
('Lists.', 1)

What have we done in PySpark Word Count?

We created a SparkContext to connect connect the Driver that runs locally.

sc = SparkContext("local","PySpark Word Count Exmaple")

Next, we read the input text file using SparkContext variable and created a flatmap of words. words is of type PythonRDD.

words = sc.textFile("D:/workspace/spark/input.txt").flatMap(lambda line: line.split(" "))

we have split the words using single space as separator.

Then we will map each word to a key:value pair of word:1, 1 being the number of occurrences.

words.map(lambda word: (word, 1))

The result is then reduced by key, which is the word, and the values are added.

reduceByKey(lambda a,b:a +b)

The result is saved to a text file.

wordCounts.saveAsTextFile("D:/workspace/spark/output/")

Summary

Concluding this tutorial of Python Examples, we learned how to count the occurrences of unique words using PySpark.


Python Libraries