My professor started off our Java streams unit with: “Java 8 was the biggest, baddest update to Java.” After working with Java streams for a bit, I have to say that I agree.
What is Java Streams
So, first of all, what is Java Streams?
The Java Stream API allows you to work with large sets of data easily by providing functional programming methods to filter, transform, and combine your data. I will be getting into functional programming in another concept post, but for now, let’s define it broadly as a programming method which values computations as mathematical expressions.
Now that we know what Java Streams are, I’ll go over it in conjunction to my solution code!
Example Code
public class TweetTaggeeRevised {
private static int namePos = 8;
private static int textPos = 11;
private static Predicate<String> clean() {
return line -> !(line.trim().equals("")) && (line != null);
}
private static String getName(String line) {
return line.split(",")[namePos];
}
private static Stream<String> turnStream(String line) {
return Arrays.stream(line.split(",")[textPos].split("\\s+"))
.filter(word -> word.startsWith("@"))
.distinct();
}
private static Function<String, Stream<String>> getTags = (line) -> turnStream(line);
/**
* <p>getTweeterTaggeeCountRev parses through a tweet file and returns a Map of Tweeter
* names and a map of taggees that the tweeter has used and its counts.
*
* <p>This version is thread-safe.
*
* @author Yiping-Allison
* @param lines
* @return Map of Tweeter names and number of times they've used the tag in a particular tweet
*/
public static Map<String, Map<String, Long>> getTweeterTaggeeCountRev(List<String> lines) {
Map<String, Map<String, Long>> res = lines.stream()
.skip(1)
.parallel()
.filter(clean())
.collect(Collectors.groupingByConcurrent(TweetTaggeeRevised::getName,
Collectors.flatMapping(getTags,
Collectors.groupingBy(Function.identity(),
Collectors.counting()))));
return res;
}
}
The goal of the demo code is that when given a valid Twitter Data CSV file, it is able to parse the file and return a map data structure which contains the Tweeter names and another map with their tags and its respective counts.
A Note on the Code
For some background, the csv file always has the Tweeter name at index 8 and the Text position at index 11. If you have a dataset with different positions, you can modify these indexes yourself. Or even better… you can write a function to find the correct index dynamically.
Here is an example line of the CSV:
,tweet_id,airline_sentiment,airline_sentiment_confidence,negativereason,negativereason_confidence,airline,airline_sentiment_gold,name,nega,,,,,,
71,1782738945,negative,1,Customer Service Issue,1,Southwest,,jane,,1, http://testurl http://test @yoyo @yoyo has the WORST customer service of any airline I've ever flown.,,2/17/2015 8:30,,Central Time (US & Canada)
The Breakdown
If you take a look at the last function in the example, this is where the main stream logic is located in. A stream is like a pipeline. We first start at the beginning, and we can see that I started the stream with: lines.stream()
, where lines
is a list of strings which represent Twitter CSV lines passed in from a different method not included in the demo. Next, we see skip(1)
. skip(n)
is relatively straightforward – it skips the first n elements of the stream. In my case, I wanted the stream to skip the header line of my CSV.
The filter()
is the meaty part. Filter is commonly used in functional programming as a easy way to remove items which don’t fit your criteria from your list. You usually have a set condition where if an element does not match your specification, its value is set to false, otherwise true. Whatever value is true will be kept in the remaining stream. In my code I called filter(clean())
, which means that I want whatever element I’m currently processing to be passed in the function clean()
. Clean is a Predicate function. A Predicate is a generic functional interface which represents a function that returns a bool. Based on this definition, we can see that this type of function works well with filters. In my example code, any line that is blank or empty will be removed from the stream because clean()
returns false. Remember: elements in the stream will only be kept at that stage in the pipeline if clean()
is true.
After the filter stage, we start parsing the rest of the line for what we’re interested in: Tweeter name, Tweeter tags and its respective counts. Before we get into this part of the explanation, we need some definitions first…
Define the following:
- Collectors is a Java Class which implements helpful reduction operations
- groupingBy is a collector which groups items together by some property and stores it in a map data structure
- flatMapping flattens your stream (E.g. a 2-D array becomes a 1-D array)
- counting counts the total number of elements in the stream at that point
- Function.identity returns a function which always returns its input argument
Now that we have the definitions, we can continue with TweetTaggeeRevised::getName
. This notation is a shorthand to reference a lambda function (I’ll get to this in another post). For now, if you take a look at getName()
, you can see that it just returns the name at name position. The next part is tricker to understand. At this point in the pipeline, we still have a whole line, but now we want the Tags and its counts. So how do we do that? Since we’re using streams, we don’t need loops! What we can do is to define a new stream, so I put a flatMapping
operation.
The line is passed to getTags
which acts like an intermediary function. getTags
will apply a different function on the line, namely turnStream()
. turnStream()
will take the line, extract the text position, and split everything by whitespace. This gives us a stream of words from the text column of our csv file. Furthermore, the stream is defined to filter out any duplicate tags using the distinct()
keyword.
When we put all of this together at:
Collectors.groupingBy(Function.identity(),
Collectors.counting()))));
We see that by now, we only have a stream of non-duplicate tags. These tags are then counted and grouped together in the map organized by Tweeter names.
The results would look something like this, given you follow the same formatting:
Tweeter: jaykirell
Taggee: @united Count: 1
Taggee: @MaxAbrahms Count: 1
One thing to note: This code uses concurrent streams. Streams for data processing are split between two kinds – iterative processing and concurrent processing. Iterative processing is a lot like using loops. Each element of the stream is processed one at a time. Concurrent streams, however, are the opposite. Concurrency uses threads to process multiple lines of data at the same time which can cause data race issues when used incorrectly. Please make sure to follow the golden rules of parallel processing before playing around with it.
To use the normal iterative version, the equivalent groupingBy function is just Collectors.groupingBy()
. I will explain concurrency concepts in a different post.
I hope this has been somewhat of an interesting read on how I solved one part of my assignment using Java Streams API and various Java implementations of Functional Programming concepts!