Scalding the Crunchy Pig for Cascading into the Hive
aka. What's the best tool for writing Hadoop jobs?
David Whiting, Sept 2013
aka. What's the best tool for writing Hadoop jobs?
David Whiting, Sept 2013
"Build a Cascade
from Flows
which connect Taps
via Pipes
built into Assemblies
to process Tuples
"
Inputs
Output
public class CascadingStreamExample extends SubAssembly { private Fields trackPlayedMessageFields = new Fields("tpm.track_id", "tpm.user_name", "tpm.timestamp"); private Fields userInfoFields = new Fields("u.user_name", "u.country", "u.subscription_level"); private Fields trackMetadata = new Fields("t.track_id", "t.title", "t.artist"); public CascadingStreamExample(Pipe trackPlayedMessage, Pipe userInfo, Pipe trackMetadata) { Pipe tpmUser = new CoGroup(trackPlayedMessage, new Fields("tpm.user_name"), userInfo, new Fields("u.user_name"), new InnerJoin()); Pipe tpmUserTrack = new CoGroup(tpmUser, new Fields("tpm.track_id"), trackMetadata, new Fields("t.track_id")); Pipe canonicalise = new Each(tpmUserTrack, new Fields("t.artist"), new CanonicalArtist(), Fields.REPLACE); Pipe aggregate = new AggregateBy(canonicalise, new Fields("canonical_artist", "u.country"), new CountBy(new Fields("count"))); Pipe cut = new Retain(aggregate, new Fields("canonical_artist", "u.country", "count")); Pipe rename = new Rename(cut, Fields.ALL, new Fields("artist", "country", "streams")); this.setTails(rename); } }
public class CanonicalArtist extends BaseOperation<Object> implements Function<Object> { public CanonicalArtist() { super(1, new Fields("canonical_artist")); } @Override public void operate(FlowProcess flowProcess, FunctionCall<Object> functionCall) { String artist = functionCall.getArguments().getString(0); String canonicalArtist = artist.toLowerCase(); functionCall.getOutputCollector().add(new Tuple(canonicalArtist)); } }
class ArtistCountryStreams(args: Args) extends Job(args) { val trackPlayedMessageSchema = ('tpmTrackId, 'tpmUserName, 'tpmTimestamp) val userInfoSchema = ('uUserName, 'uCountry, 'uSubscriptionLevel) val trackMetadataSchema = ('tTrackId, 'tTitle, 'tArtist) SomeSource(args("tpm"), trackPlayedMessageSchema) .joinWithSmaller('tpmUserName -> 'uUserName, SomeSource("userInfo", userinfoSchema)) .joinWithSmaller('tpmTrackId -> 'tTrackId, SomeSource("trackMetadata", trackMetadataSchema)) .map('tArtist -> 'canonArtist) { tArtist: String => tArtist.toLowerCase } .project('canonArtist, 'uCountry) .groupBy('canonArtist, 'uCountry) { _.size('streams) } .rename(('canonArtist, 'uCountry) -> ('artist, 'country)) }
Works with real types with a strategy based on lazy collections
PCollection<T>
PTable<K,V>
PGroupedTable<K,V>
MapFn<T1,T2>
: T1 → T2CombineFn<K,V>
: (K, Iterable<V>) → (K, V)public PCollection<ArtistCountryStreams> computeStreams( PCollection<TrackPlayedMessage> trackPlayedMessages, PCollection<UserInfo> userInfos, PCollection<TrackMetadata> trackMetadatas) { PCollection<Pair<TrackPlayedMessage, UserInfo>> messageUser = Join.innerJoin( trackPlayedMessages.by(new TrackPlayedMessage.Username(), Avros.strings()), userInfos.by(new UserInfo.Username(), Avros.strings())) .values(); PCollection<ArtistCountryStreams> artistCountryStreams = Join.innerJoin( messageUser.by(mapFirst(new TrackPlayedMessage.TrackId(), UserInfo.class), Avros.ints()), trackMetadatas.by(new TrackMetadata.TrackId(), Avros.ints()) ) .values() .parallelDo(new CreateArtistCountryStreamsKey(), Avros.reflects(ArtistCountryStreams.Key.class)) .count() .parallelDo(new ArtistCountryStreams.Create(), Avros.reflects(ArtistCountryStreams.class)); return artistCountryStreams; }
public static class TrackPlayedMessage { public final String username; public final int trackId; public final long timestamp; public TrackPlayedMessage() { username = null; trackId = 0; timestamp = 0; } public TrackPlayedMessage(String username, int trackId, long timestamp) { this.username = username; this.trackId = trackId; this.timestamp = timestamp; } public static class Username extends MapFn<TrackPlayedMessage, String> { @Override public String map(TrackPlayedMessage input) { return input.username; } } public static class TrackId extends MapFn<TrackPlayedMessage, Integer> { @Override public Integer map(TrackPlayedMessage input) { return input.trackId; } } }
private PCollection<TrackPlayedMessage> messages = MemPipeline.typedCollectionOf( Avros.reflects(TrackPlayedMessage.class), new TrackPlayedMessage("adam", 1001, 200000L), new TrackPlayedMessage("brian", 1001, 200001L), new TrackPlayedMessage("charlie", 1002, 200002L), new TrackPlayedMessage("dave", 1003, 200003L), new TrackPlayedMessage("dave", 1004, 200004L) ); private PCollection<TrackMetadata> trackMeta = MemPipeline.typedCollectionOf( Avros.reflects(TrackMetadata.class), new TrackMetadata(1001, "track1", "artistA"), new TrackMetadata(1002, "track2", "artistA"), new TrackMetadata(1003, "track1", "artistB"), new TrackMetadata(1004, "track2", "artistB") ); private PCollection<UserInfo> userInfo = MemPipeline.typedCollectionOf( Avros.reflects(UserInfo.class), new UserInfo("adam", SubscriptionLevel.FREE, "PL"), new UserInfo("brian", SubscriptionLevel.FREE, "US"), new UserInfo("charlie", SubscriptionLevel.PREMIUM, "US"), new UserInfo("dave", SubscriptionLevel.PREMIUM, "GB") ); @Test public void testComputeStreams() throws Exception { CrunchStreamExample crunchStreamExample = new CrunchStreamExample(); List<ArtistCountryStreams> output = Lists.newArrayList( crunchStreamExample.computeStreams(messages, userInfo, trackMeta) .materialize()); List<ArtistCountryStreams> expected = Lists.newArrayList(); assertEquals(expected, output); }
object CrunchStreamExample { class TrackPlayedMessage(val username: String, val trackId: Int, val timestamp: Long) class UserInfo(val username: String, val isPremium: Boolean, val country: String) class TrackMetaData(val trackId: Int, val title: String, val artist: String) class ArtistCountryKey(val artist: String, val country: String) class ArtistCountryStreams(val key: ArtistCountryKey, val streams: Long) def computeStreams(trackPlayedMessages: PCollection[TrackPlayedMessage], userInfos: PCollection[UserInfo], trackMetadatas: PCollection[TrackMetaData]) :PCollection[ArtistCountryStreams] = { trackPlayedMessages.by(_.username) .innerJoin(userInfos.by(_.username)) .values() .by(_._1.trackId) .innerJoin(trackMetadatas.by(_.trackId)) .values() .map(v => new ArtistCountryKey(v._2.artist, v._1._2.country)) .count() .map((k: ArtistCountryKey, v: Long) => new ArtistCountryStreams(k, v)); } }
TRACK_PLAYED_MESSAGES = LOAD '/tpm' AS (m_track_id, m_user_name, m_timestamp); USER_INFO = LOAD '/user_info' AS (u_user_name, u_country, u_subscription_level); TRACK_METADATA = LOAD '/track_metadata' AS (t_track_id, t_title, t_artist); MESSAGE_USER = JOIN TRACK_PLAYED_MESSAGES BY m_user_name, USER_INFO BY u_user_name; MESSAGE_USER_TRACK = JOIN MESSAGE_USER BY m_track_id, TRACK_METADATA by t_track_id; GROUPED = GROUP MESSAGE_USER_TRACK BY t_artist, u_country; OUTPUT = FOREACH GROUPED GENERATE $0.t_artist, $0.u_country, COUNT(MESSAGE_USER_TRACK); STORE OUTPUT INTO '/output';
CREATE TABLE track_played_message (username STRING, track_id INT, timestamp INT) ... CREATE TABLE user_info (username STRING, is_premium BOOLEAN, country STRING) ... CREATE TABLE track_metadata (track_id INT, title STRING, artist STRING) ... CREATE TABLE artist_country_streams (artist STRING, country STRING, streams INT) ... INSERT OVERWRITE TABLE artist_country_streams SELECT t.artist, u.country, count(*) FROM track_played_message msg JOIN user_info u on msg.username = u.username JOIN track_metadata t on msg.track_id = t.track_id GROUP BY t.artist, u.country