Blog

Using DSE to run Solr Spark Jobs

Ever since Tim Potter from LucidWorks published his Spark Solr project, I’ve been wanting to play with it.

This week I was at Cassandra Summit 2015 , and so took the opportunity to try out moving data into and out of Solr using Spark. Since I’m at Cassandra Summit, I figured I would take the recently released DSE 4.8.0 for a spin as well. These are my notes from working through the demo that Tim put out. I recommend for background you read his blog post https://lucidworks.com/blog/solr-spark-sql-datasource/, which is what I followed to get some context.

I first downloaded DSE and used the installer to put it into ~/dse folder. I started a single DSE Analytics node via ~/dse/bin/dse cassandra -k to enable Spark.

I then cloned the Github project at https://github.com/LucidWorks/spark-solr.

The first gotcha that I ran into was actually setting up the Twitter connection. It took me a while to figure out that I needed to create an “app” on Twitter’s platform in order to have OAuth credentials. The direct link is https://apps.twitter.com/. Create an app, and then you can get your user and access tokens.

Next, you need to add a twitter4j.properties file into ./src/main/resources directory of the spark-solr project. Using the system properties didn’t work for me.

debug=trueoauth.consumerKey=YOUR_CONSUMER_KEYoauth.consumerSecret=YOUR_CONSUMER_SECREToauth.accessToken=YOUR_ACCESS_TOKENoauth.accessTokenSecret=YOUR_ACCESS_TOKEN_SECRET

Then I compiled via mvn clean package -DskipTests. I did try running the tests, and they mostly ran successfully for me!

I already had a SolrCloud cluster running, with ZooKeeper on port 2181, and had a collection1 using the standard default schema ready to go. The moment of truth had arrived, would the SolrRDD code work with the version of Spark that comes with DSE? Would all the versions of jar files work out? I submitted the newly packaged Spark job to DSE:

~/dse/bin/dse spark-submit --master local[2] --verbose --class com.lucidworks.spark.SparkApp ./target/spark-solr-1.0-SNAPSHOT-shaded.jar twitter-to-solr -zkHost localhost:2181 -collection collection1

And it ran great!

Since I am running just a single DSE Spark node running, it ran just a single process, so it’s pretty inefficient. It took me a bit to figure out that once the job was running, I could go to http://localhost:4040 to monitor the progress of the twitter-to-solr job being run. The UI for Spark has really improved since last fall when I last did Spark work.

I then triggered a commit on the Solr collection: http://localhost:8983/solr/collection1/update?commit=true, and then checked the documents: http://localhost:8983/solr/collection1/select?q=:&rows=0. Yep, tweets are streaming in!

I let the Spark job run for a while, and it seemed like there was no end in sight for downloading data. I wish via the Spark UI there was a way of figuring how many jobs would be run to bring all the data in. I ended up letting it run for 20 minutes, and got to about 25,000 tweets.

So now I wanted to play with the data. Here the specific Spark Shell commands diverged from the README since I am using the DSE flavor of Spark.

To start up the Spark Shell I ran:

~/dse/bin/dse spark --jars ./target/spark-solr-1.0-SNAPSHOT-shaded.jar

If you still have the twitter-to-solr job running, then go to http://localhost:4041, and you can see the Spark UI for this spark shell process, otherwise it will be on port 4040.

I won’t go through each command, you can read about each one in the README, however here is the DSE specific version of the commands:

import com.lucidworks.spark.SolrRDD;var solrRDD = new SolrRDD("localhost:2181","collection1");var tweetsRDD = solrRDD.query(sc,"*:*");var count = tweetsRDD.count();csc.setKeyspace("tweets");var tweetsDF = solrRDD.asTempTable(csc, "*:*", "tweets");csc.sql("SELECT COUNT(type_s) FROM tweets WHERE type_s='echo'").show();tweetsDF.printSchema();

Notice that we have both tweetsRDD and tweetsDF as objects. A DataFrame is quite different from a RDD, as discussed in the introductory blog post https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html.

Once I had the data pulled into my Spark Shell, I started thinking, hey, how can I get it stored into Cassandra? I tried for a while till I realized that a DataFrame isn’t what I needed, I needed an RDD. The first step was to create a keyspace/table combo in Cassandra:

CREATE KEYSPACE tweets  WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };CREATE TABLE tweets.tweets (  id text PRIMARY KEY,  "_indexed_at_tdt" timestamp ,  "_version_" text,  "accessLevel_i" int,  author_s text,  "createdAt_tdt" timestamp,  "currentUserRetweetId_l" int,  favorited_b boolean,  id_l int,  "inReplyToStatusId_l" int,  "inReplyToUserId_l" int,  "possiblySensitive_b" boolean,  provider_s text,  "retweetCount_l" int,  retweet_b boolean,  "retweetedByMe_b" boolean,  source_t text,  text_s text,  text_t text,  truncated_b boolean,  type_s text);

Okay, time for the moment of truth!

tweetsDF.write.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "tweets", "keyspace" -> "tweets")).save;

And then I checked via CQL, and there was my data!

select * from tweets.tweets;

I did try to figure out how to create the Cassandra table from the tweetsDF schema, but that will have to wait till SPARKC-231 is committed. Until then, you need to manually create the table structure.

Imported into Cassandra from Twitter, via a stop in Solr land. The typical use case for moving data around is data in Cassandra -> Solr, however now I can copy data FROM Solr into Cassandra!

Or even better, compare datasets between Solr and other system using Spark. That will be another post.