28/06/2013

[Python] Map/Reduce examples

These are simple algorithms written following the Map/Reduce model in Python. To successfully run them, you must download this archive containing:
  • MapReduce.py and MapReduce.pyc: a simple implementation of the Map/Reduce programming model in Python
  • books.json, dna.json, friends.json, matrix.json and records.json: simple datasets

All these tasks come from the third assignment in the Big Data Coursera course I followed.

The first task involved the books dataset presenting an input as a 2 element list: [document_id, text] where document_id is the document identifier formatted as a string and text is the text of the document formatted as a string. The output should be a (word, document ID list) tuple where word is a String and document ID list is a list of Strings.

 import MapReduce  
 import sys  
   
 mr = MapReduce.MapReduce()  
   
 def mapper(record):  
   # key: document identifier  
   # value: document contents  
   key = record[0]  
   value = record[1]  
   words = value.split()  
   for w in words:  
      #create list word - docid  
    mr.emit_intermediate(w, key)  
   
 def reducer(key, list_of_values):  
   # key: word  
   # value: docids  
        total = []  
      #append docids to word index if not already there  
      for v in list_of_values:  
           if v not in total:  
                total.append(v)  
      mr.emit((key, total))  
   
 if __name__ == '__main__':  
  inputdata = open(sys.argv[1])  
  mr.execute(inputdata, mapper, reducer)  
   

You can test the solution by running:

python inverted_index.py books.json

The second task required us to replicate the following query:

SELECT *
FROM Orders, LineItem
WHERE Order.order_id = LineItem.order_id;


in Map/Reduce using the records dataset. The input are database records formatted as lists of Strings, every list element corresponds to a different field in its corresponding record.
The first item(index 0) in each record is a string that identifies which table the record originates from. This field has two possible values: line_item which indicates that the record is a line item and order which indicates that the record is an order.
The second element(index 1) in each record is the order_id.

Lastly, LineItem records have 17 elements including the identifier string and Order records have 10 elements including the identifier string.

The output should be a joined record.The result should be a single list of length 27 that contains the fields from the order record followed by the fields from the line item record. Each list element should be a string.


 import MapReduce  
 import sys  
   
 mr = MapReduce.MapReduce()  
   
 # =============================  
 # Do not modify above this line  
   
 def mapper(record):  
      # key: table name. table[1]=order id, order table 10 columns, line table 17 columns  
      # value: full records for that orderid  
      key = record[0]  
      orderid=record[1]  
      mr.emit_intermediate(orderid, record)  
   
   
 def reducer(key, list_of_values):  
      #key: orderid  
      #list_of_values: full record from both tables  
      results={}  
      values=[]  
      out_counter=0  
      list_length=len(list_of_values)  
      #for every item in our list, the first record is always from order table, while the others are from line and must be joined with it  
      for i in range(1,list_length):  
           values+=list_of_values[0]#order  
           values+=list_of_values[i]#i-th join  
           results[out_counter]=values#temporarily store the full joined row  
           values=[]  
           mr.emit(results[out_counter])#print it and move to the next one  
           out_counter+=1  
   
 # Do not modify below this line  
 # =============================  
 if __name__ == '__main__':  
  inputdata = open(sys.argv[1])  
  mr.execute(inputdata, mapper, reducer)  
   

Test it with:

python join.py records.json

The third task asked us to consider a simple social network dataset consisting of key-value pairs where each key is a person and each value is a friend of that person then describe a MapReduce algorithm to count he number of friends each person has.
The input is a 2 element list: [personA, personB] where personA is the name of a person formatted as a string and personB is the name of one of personA’s friends formatted as a string.

This implies that personB is a friend of personA, but it does not imply that personA is a friend of personB. The output should be a (person,  friend count) tuple where person is a string and friend count is an integer describing the number of friends person has.


 import MapReduce  
 import sys  
   
 mr = MapReduce.MapReduce()  
   
 # =============================  
 # Do not modify above this line  
   
 def mapper(record):  
      # key: person  
      # value: friend  
      key = record[0]  
      friend=record[1]  
      mr.emit_intermediate(key, friend)  
   
   
 def reducer(key, list_of_values):  
      #key: person  
      #list_of_values: friend_list  
      mr.emit((key, len(list_of_values)))  
   
 # Do not modify below this line  
 # =============================  
 if __name__ == '__main__':  
  inputdata = open(sys.argv[1])  
  mr.execute(inputdata, mapper, reducer)  
   

You can test it using the friends dataset and running:

python friend_count.py friends.json

The next problem was still on the friends dataset and asked us to implement a MapReduce algorithm to generate a list of all non-symmetric friend relationships. The input is the same as the previous problem but the expected output this time is a tuple (person, friend) or (friend, person) for each asymmetric friendship, remembering that only one of those output tuples will exist in the input.


 import MapReduce  
 import sys  
   
 mr = MapReduce.MapReduce()  
   
 # =============================  
 # Do not modify above this line  
   
 def mapper(record):  
      # key: person  
      # value: friend  
      key = record[0]  
      friend=record[1]  
      mr.emit_intermediate(key, friend)  
   
   
 def reducer(key, list_of_values):  
      #key: person  
      #list_of_values: person friends  
      #mr.intermediate: full dictionary person-friend list  
      for v in list_of_values:#for every friend of mine  
           if v not in mr.intermediate.keys():#if he has no friends, it's asymmetric  
                mr.emit((key, v))  
                mr.emit((v, key))  
           else:#search his friends for me  
                if key not in mr.intermediate[v]:#if i'm not there, it's asymmetric  
                     mr.emit((key,v))  
                     mr.emit((v,key))  
   
 # Do not modify below this line  
 # =============================  
 if __name__ == '__main__':  
  inputdata = open(sys.argv[1])  
  mr.execute(inputdata, mapper, reducer)  
   

The solution can be tested with:

python asymmetric_friendships.py friends.json

The fifth problem moved over to the dna dataset and required us to consider a set of key-value pairs where each key is sequence id and each value is a string of nucleotides. We had to write a MapReduce query to remove the last 10 characters from each string of nucleotides, then remove any duplicates generated given an input as a 2 element list: [sequence id, nucleotides] where sequence id is a unique identifier formatted as a string and nucleotides is a sequence of nucleotides formatted as a string.


 import MapReduce  
 import sys  
   
 mr = MapReduce.MapReduce()  
   
 # =============================  
 # Do not modify above this line  
   
 def mapper(record):  
      # key: id  
      # value: sequence  
      key = record[0]  
      sequence=record[1]  
      trimmed=sequence[:-10]  
      mr.emit_intermediate(trimmed, 0)#trim last 10 characters then pass it along as key, automatically removing all duplicates  
   
   
 def reducer(key, list_of_values):  
      #key: id  
      #list_of_values: trimmed sequences  
      #mr.intermediate:  
      mr.emit(key)#nothing to do here, just print all unique keys out  
   
 # Do not modify below this line  
 # =============================  
 if __name__ == '__main__':  
  inputdata = open(sys.argv[1])  
  mr.execute(inputdata, mapper, reducer)  
   


Run it as:

python unique_trims.py dna.json 

Finally, the last problem asked us to compute the matrix multiplication between two non-square matrices but it was a little bugged since we had no way of computing the resulting matrix size beforehand. Given we're working in Map/Reduce, we would have to read all the data once to correctly determine the size before cycling on it a second time to perform our calculations; in a Big Data context however, being allowed two passes on the same data is a rare luxury. This forced us to hardcode the size inside our script, somewhat crippling the exercise.

Anyway, the input was presented as the matrix row records formatted as lists. Each list will have the format [matrix, i, j, value] where matrix is a string and i, j, and value are integers.

The first item, matrix, is a string that identifies which matrix the record originates from. This field has two possible values: either a if the record is from matrix A or b. The output should be matrix row records formatted as tuples as well. Each tuple will have the format (i, j, value) where each element is an integer.


 import MapReduce  
 import sys  
   
 mr = MapReduce.MapReduce()  
   
 # =============================  
 # Do not modify above this line  
   
 def mapper(record):  
      # key: matrix (a,b)  
      # value: i,j,value  
      key = record[0]  
      if key=='a':#since it's A*B  
           mr.emit_intermediate(key, [record[1],record[2],record[3]])#index a on i,j,value  
      else:  
           mr.emit_intermediate(key, [record[2],record[1],record[3]])#and b on j,i,value  
   
   
 def reducer(key, list_of_values):  
      #key: matrix a or b  
      #list_of_values: i,j,value for a and j,i,value for b  
      #mr.intermediate: contains all rows from both a and b in a dictionary keyed on a,b  
      a={}  
      b={}  
      if key=='a':#computing A*B  
           #I NEED THE MATRIX DIMENSIONS!!! (5)  
           #populate two dictionaries with our known values  
           for v in list_of_values:  
                a[(v[0], v[1])]=v[2]  
           for r in mr.intermediate['b']:  
                b[(r[0], r[1])]=r[2]  
           #and fill the blanks  
           for i in range(0,5):  
                for j in range(0,5):  
                     if (i,j) not in a.keys():  
                          a[(i,j)]=0  
                     if (j,i) not in b.keys():  
                          b[(j,i)]=0  
           result=0  
           #compute the multiplication A*Bij = SUM(Aik * Bkj) for k in 0..4  
           for i in range(0,5):  
                for j in range(0,5):  
                     for k in range(0,5):  
                          result+=a[(i,k)]*b[(j,k)]  
                     mr.emit((i,j,result))  
                     result=0            
   
 # Do not modify below this line  
 # =============================  
 if __name__ == '__main__':  
  inputdata = open(sys.argv[1])  
  mr.execute(inputdata, mapper, reducer)  
   

The script is based on the matrix dataset and can be run as:

python multiply.py matrix.json

Again, another interesting approach to Python programming as well as the Map/Reduce model.

No comments:

Post a Comment

With great power comes great responsibility