Blog

Building Apache Flume to use Solr 6 as a sink

Using the latest Solr with Apache Flume can be difficult because the MorphlineSolrSink in the 1.6 release is stuck supporting Solr 4. However, Cloudera contributed solr-morphlines-core and solr-morphlines-cell to the Solr project and with a few tricks Flume can be updated to the latest and greatest Solr. I’ll demonstrate some of the problems I ran into running 1.6 out of the box and how I made it through those problems. Let’s explore how to upgrade Flume to use a newer Solr library and how to build it reliably. For this article, I am focused on using apache-flume-1.6.0.bin.tar.gzapache-flume-1.6.0-src.tar.gzsolr-4.10.3.tgz. and solr-6.0.0.tgz. The first part will focus on getting Flume to work with Solr 4.10.3 and then Solr 5.2.1 and 6.0.0.

Flume doesn’t come with every plugin built out of the box but if you build the full project from source you get a more fully ready package. Also Flume expects some jars to be installed or included with external projects. I wanted to be able to just run Flume without having to juggle dependencies. With some pom file updates I was able to get a fully built Flume artifact that will work with Solr 6 out of the box. The major problem that I kept running into was dependency issues.

FLUME 1.6.0 AND SOLR 4.10.3

When I tried running Flume out of the box with the binary distribution I ran into a number of problems. Following the Flume 1.6.0 User Guide setup steps, I setup a simple spooling directory source with a MorphlineSolrSink. Next, I followed the CDK Morphlines Documentation to setup the morphlineFile. Another thing I wanted was to use the cdk-morphlines-core-stdlib grok feature. The documentation states, “Morphlines ships with several standard grok dictionaries.” but because the cdk only builds runtime dependencies the dictionaries are left out. You need to either make your own dictionary file or copy those to a well known path and use them with the dictionaryFiles path. Another option is to use the dictionary string where you just make a massive string with all the patterns you need in the config, ie

dictionaryString : """
SPACE \s*
DATA .*?
GREEDYDATA .*
"""

After following all the instructions, my pipeline was ready and I ran Flume using

bin/flume-ng agent -Dflume.root.logger=INFO,console --conf conf -f conf/flume.conf -n a1

and encountered my first error.

2016-05-23 10:30:03,848 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: SINK, name: k1 started
2016-05-23 10:30:03,849 (lifecycleSupervisor-1-1) [ERROR - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:253)] Unable to start SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@4c0fd2c7 counterGroup:{ name:null counters:{} } } - Exception follows.
java.lang.NoClassDefFoundError: org/kitesdk/morphline/api/MorphlineCompilationException
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:264)
        at org.apache.flume.sink.solr.morphline.MorphlineSink.start(MorphlineSink.java:93)
        at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:46)
        at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
        at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: org.kitesdk.morphline.api.MorphlineCompilationException
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 13 more
2016-05-23 10:30:03,851 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.sink.solr.morphline.MorphlineSink.stop(MorphlineSink.java:106)] Morphline Sink k1 stopping...

This is a classic missing jar exception. Anytime you are getting null objects and java.lang.NoClassDefFoundError, a dependency is missing. FLUME-2392 indicates that Kite SDK won’t be included so you can either modify the pom file or you need the jars on your classpath somewhere. It’s probably a philosophical conversation for the best approach to containing these jars, ie install packages independently, use mvn to retrieve jars, include dependencies in the pom, etc but my approach is to try to just use Maven and it’s plugins to get the payload I want to deploy. With that in mind, let’s follow the recommendation to remove the <optional>true</optional> from the kite-morphlines-all dependency in the flume-ng-morphline-solr-sink project within the apache-flume-1.6.0-src code. After removing the optional tag, I rebuild the project at the root using

mvn clean package -Dmaven.test.skip=true

My final artifact builds into flume-ng-dist/target/apache-flume-1.6.0-bin.tar.gz.

So now that we have a new build lets try it out before we distribute the tar.gz.

cd ./flume-ng-dist/target/apache-flume-1.6.0-bin/apache-flume-1.6.0-bin
bin/flume-ng agent -Dflume.root.logger=INFO,console --conf conf -f conf/flume.conf -n a1

Blast, another error.

2016-05-24 11:15:50,052 (lifecycleSupervisor-1-2) [ERROR - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:253)] Unable to start SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@401d9cde counterGroup:{ name:null counters:{} } } - Exception follows.
java.lang.NoClassDefFoundError: org/apache/zookeeper/KeeperException
        at org.kitesdk.morphline.solr.SanitizeUnknownSolrFieldsBuilder$SanitizeUnknownSolrFields.<init>(SanitizeUnknownSolrFieldsBuilder.java:68)
        at org.kitesdk.morphline.solr.SanitizeUnknownSolrFieldsBuilder.build(SanitizeUnknownSolrFieldsBuilder.java:52)
        at org.kitesdk.morphline.base.AbstractCommand.buildCommand(AbstractCommand.java:302)
        at org.kitesdk.morphline.base.AbstractCommand.buildCommandChain(AbstractCommand.java:249)
        at org.kitesdk.morphline.stdlib.Pipe.<init>(Pipe.java:46)
        at org.kitesdk.morphline.stdlib.PipeBuilder.build(PipeBuilder.java:40)
        at org.kitesdk.morphline.base.Compiler.compile(Compiler.java:126)
        at org.kitesdk.morphline.base.Compiler.compile(Compiler.java:55)
        at org.apache.flume.sink.solr.morphline.MorphlineHandlerImpl.configure(MorphlineHandlerImpl.java:101)
        at org.apache.flume.sink.solr.morphline.MorphlineSink.start(MorphlineSink.java:97)
        at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:46)
        at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
        at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: org.apache.zookeeper.KeeperException
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 20 more

So the core problem here again is a dependency is missing; ZooKeeper in this case. FLUME-2840 offers a patch which removes the test scope from all ZooKeeper. Even though the parent project has ZooKeeper in a compile time scope, the default Maven profiles that are set, hbase-98, maven-3, nonThrift, not-windows (if running linux), ossrh and hbase-98 scopes ZooKeeper as a test dependency so it is overriding the parent. So download the FLUME-2840.patch and fix it up.

curl -O https://issues.apache.org/jira/secure/attachment/12783022/FLUME-2840.patch
patch -p1 <FLUME-2840.patch

Okay so that fixes the ZooKeeper dependencies. Firing up flume after another build gives the following warning.

2016-05-24 11:50:45,030 (lifecycleSupervisor-1-1) [INFO - org.apache.solr.common.cloud.ConnectionManager.waitForConnected(ConnectionManager.java:207)] Waiting for client to connect to ZooKeeper
2016-05-24 11:50:45,032 (lifecycleSupervisor-1-1-SendThread(localhost:2181)) [INFO - org.apache.zookeeper.ClientCnxn$SendThread.logStartConnect(ClientCnxn.java:966)] Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2016-05-24 11:50:45,088 (lifecycleSupervisor-1-1-SendThread(localhost:2181)) [WARN - org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1089)] Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
        at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:350)
        at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1068)

This warning is great! It means we finally have our Flume process trying to talk to Solr. Go ahead and fire up the example Solr 4.10.3 in cloud mode with

bin/solr -c -e cloud -noprompt

Keep in mind that the example Solr cloud starts a ZooKeeper instance on the port 1000 greater than the Solr port, so 8983 for Solr and 9983 for ZooKeeper. Fire up Flume and drop a message in a file in the spooling directory, like

echo joe > /tmp/messages/joe-messages

We see the following.

[INFO - org.kitesdk.morphline.stdlib.LogInfoBuilder$LogInfo.log(LogInfoBuilder.java:64)] output record: [{id=[joe]}]

So far so good. Let’s see if it gets to ZooKeeper.

[INFO - org.apache.zookeeper.ClientCnxn$SendThread.primeConnection(ClientCnxn.java:849)] Socket connection established to localhost/127.0.0.1:9983, initiating session

Check again! Okay this message is going through right!?

2016-05-24 12:42:50,112 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.sink.solr.morphline.MorphlineSink.process(MorphlineSink.java:163)] Morphline Sink k1: Unable to process event from channel c1. Exception follows.
org.apache.solr.client.solrj.impl.CloudSolrServer$RouteException: org/apache/http/pool/ConnPoolControl
        at org.apache.solr.client.solrj.impl.CloudSolrServer.directUpdate(CloudSolrServer.java:360)
        at org.apache.solr.client.solrj.impl.CloudSolrServer.request(CloudSolrServer.java:533)
        at org.apache.solr.client.solrj.request.AbstractUpdateRequest.process(AbstractUpdateRequest.java:124)
        at org.apache.solr.client.solrj.SolrServer.add(SolrServer.java:68)
        at org.apache.solr.client.solrj.SolrServer.add(SolrServer.java:54)
        at org.kitesdk.morphline.solr.SolrServerDocumentLoader.sendLoads(SolrServerDocumentLoader.java:140)
        at org.kitesdk.morphline.solr.SolrServerDocumentLoader.sendBatch(SolrServerDocumentLoader.java:131)
        at org.kitesdk.morphline.solr.SolrServerDocumentLoader.commitTransaction(SolrServerDocumentLoader.java:94)
        at org.kitesdk.morphline.solr.LoadSolrBuilder$LoadSolr.doNotify(LoadSolrBuilder.java:104)
        at org.kitesdk.morphline.base.AbstractCommand.notify(AbstractCommand.java:132)
        at org.kitesdk.morphline.base.Connector.notify(Connector.java:57)
        at org.kitesdk.morphline.base.AbstractCommand.doNotify(AbstractCommand.java:150)
        at org.kitesdk.morphline.base.AbstractCommand.notify(AbstractCommand.java:132)
        at org.kitesdk.morphline.base.Connector.notify(Connector.java:57)
        at org.kitesdk.morphline.base.AbstractCommand.doNotify(AbstractCommand.java:150)
        at org.kitesdk.morphline.base.AbstractCommand.notify(AbstractCommand.java:132)
        at org.kitesdk.morphline.base.Connector.notify(Connector.java:57)
        at org.kitesdk.morphline.base.AbstractCommand.doNotify(AbstractCommand.java:150)
        at org.kitesdk.morphline.base.AbstractCommand.notify(AbstractCommand.java:132)
        at org.kitesdk.morphline.base.Connector.notify(Connector.java:57)
        at org.kitesdk.morphline.base.AbstractCommand.doNotify(AbstractCommand.java:150)
        at org.kitesdk.morphline.base.AbstractCommand.notify(AbstractCommand.java:132)
        at org.kitesdk.morphline.base.AbstractCommand.doNotify(AbstractCommand.java:150)
        at org.kitesdk.morphline.base.AbstractCommand.notify(AbstractCommand.java:132)
        at org.kitesdk.morphline.base.Notifications.notify(Notifications.java:96)
        at org.kitesdk.morphline.base.Notifications.notifyCommitTransaction(Notifications.java:61)
        at org.apache.flume.sink.solr.morphline.MorphlineHandlerImpl.commitTransaction(MorphlineHandlerImpl.java:148)
        at org.apache.flume.sink.solr.morphline.MorphlineSink.process(MorphlineSink.java:156)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NoClassDefFoundError: org/apache/http/pool/ConnPoolControl
        at java.lang.ClassLoader.defineClass1(Native Method)
        at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
        at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
        at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
        at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at org.apache.http.impl.client.SystemDefaultHttpClient.createClientConnectionManager(SystemDefaultHttpClient.java:118)
        at org.apache.http.impl.client.AbstractHttpClient.getConnectionManager(AbstractHttpClient.java:466)
        at org.apache.http.impl.client.AbstractHttpClient.createHttpContext(AbstractHttpClient.java:286)
        at org.apache.http.impl.client.AbstractHttpClient.execute(AbstractHttpClient.java:851)
        at org.apache.http.impl.client.AbstractHttpClient.execute(AbstractHttpClient.java:805)
        at org.apache.http.impl.client.AbstractHttpClient.execute(AbstractHttpClient.java:784)
        at org.apache.solr.client.solrj.impl.HttpSolrServer.executeMethod(HttpSolrServer.java:448)
        at org.apache.solr.client.solrj.impl.HttpSolrServer.request(HttpSolrServer.java:210)
        at org.apache.solr.client.solrj.impl.HttpSolrServer.request(HttpSolrServer.java:206)
        at org.apache.solr.client.solrj.impl.LBHttpSolrServer.doRequest(LBHttpSolrServer.java:340)
        at org.apache.solr.client.solrj.impl.LBHttpSolrServer.request(LBHttpSolrServer.java:301)
        at org.apache.solr.client.solrj.impl.CloudSolrServer$1.call(CloudSolrServer.java:341)
        at org.apache.solr.client.solrj.impl.CloudSolrServer$1.call(CloudSolrServer.java:338)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        ... 1 more
Caused by: java.lang.ClassNotFoundException: org.apache.http.pool.ConnPoolControl
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 29 more

Back to dependency hell. What’s going on here? The root error is Caused by: java.lang.ClassNotFoundException: org.apache.http.pool.ConnPoolControl which indicates a problem with the httpclient dependency in the org.apache.httpcomponents group. We can try to track down the discrepancy by using Maven’s maven-dependency-plugin dependency:tree command.

mvn dependency:tree -Dverbose -Dincludes=org.apache.httpcomponents:httpclient
[INFO] ------------------------------------------------------------------------
[INFO] Building Flume NG Morphline Solr Sink 1.6.0
[INFO] ------------------------------------------------------------------------
[INFO] 
[INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ flume-ng-morphline-solr-sink ---
[INFO] org.apache.flume.flume-ng-sinks:flume-ng-morphline-solr-sink:jar:1.6.0
[INFO] +- org.apache.flume:flume-ng-core:jar:1.6.0:compile
[INFO] |  \- org.apache.thrift:libthrift:jar:0.9.0:compile
[INFO] |     \- org.apache.httpcomponents:httpclient:jar:4.2.1:compile (version managed from 4.1.3)
[INFO] \- org.kitesdk:kite-morphlines-all:pom:1.0.0:compile
[INFO]    \- org.kitesdk:kite-morphlines-solr-core:jar:1.0.0:compile
[INFO]       \- org.apache.solr:solr-core:jar:4.10.3:compile
[INFO]          +- org.apache.hadoop:hadoop-auth:jar:2.4.0:compile (version managed from 2.2.0)
[INFO]          |  \- (org.apache.httpcomponents:httpclient:jar:4.2.1:compile - version managed from 4.2.5; omitted for duplicate)
[INFO]          \- (org.apache.httpcomponents:httpclient:jar:4.2.1:compile - version managed from 4.3.1; omitted for duplicate)

It appears that solr-core, and hadoop-auth directly, is dependent on a later httpclient version than what is provided during compile time due to Flume’s parent pom. So let’s bump httpclient from 4.2.1 to 4.3.1 in Flume’s parent pom.xml and build again. Run it all and, boom, another exception.

2016-05-24 13:02:14,049 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to send events
        at org.apache.flume.sink.solr.morphline.MorphlineSink.process(MorphlineSink.java:186)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.solr.client.solrj.impl.CloudSolrServer$RouteException: org/apache/http/concurrent/Cancellable
        at org.apache.solr.client.solrj.impl.CloudSolrServer.directUpdate(CloudSolrServer.java:360)
        at org.apache.solr.client.solrj.impl.CloudSolrServer.request(CloudSolrServer.java:533)
        at org.apache.solr.client.solrj.request.AbstractUpdateRequest.process(AbstractUpdateRequest.java:124)
        at org.apache.solr.client.solrj.SolrServer.add(SolrServer.java:68)
        at org.apache.solr.client.solrj.SolrServer.add(SolrServer.java:54)
        at org.kitesdk.morphline.solr.SolrServerDocumentLoader.sendLoads(SolrServerDocumentLoader.java:140)
        at org.kitesdk.morphline.solr.SolrServerDocumentLoader.sendBatch(SolrServerDocumentLoader.java:131)
        at org.kitesdk.morphline.solr.SolrServerDocumentLoader.commitTransaction(SolrServerDocumentLoader.java:94)
        at org.kitesdk.morphline.solr.LoadSolrBuilder$LoadSolr.doNotify(LoadSolrBuilder.java:104)
        at org.kitesdk.morphline.base.AbstractCommand.notify(AbstractCommand.java:132)
        at org.kitesdk.morphline.base.Connector.notify(Connector.java:57)
        at org.kitesdk.morphline.base.AbstractCommand.doNotify(AbstractCommand.java:150)
        at org.kitesdk.morphline.base.AbstractCommand.notify(AbstractCommand.java:132)
        at org.kitesdk.morphline.base.Connector.notify(Connector.java:57)
        at org.kitesdk.morphline.base.AbstractCommand.doNotify(AbstractCommand.java:150)
        at org.kitesdk.morphline.base.AbstractCommand.notify(AbstractCommand.java:132)
        at org.kitesdk.morphline.base.Connector.notify(Connector.java:57)
        at org.kitesdk.morphline.base.AbstractCommand.doNotify(AbstractCommand.java:150)
        at org.kitesdk.morphline.base.AbstractCommand.notify(AbstractCommand.java:132)
        at org.kitesdk.morphline.base.Connector.notify(Connector.java:57)
        at org.kitesdk.morphline.base.AbstractCommand.doNotify(AbstractCommand.java:150)
        at org.kitesdk.morphline.base.AbstractCommand.notify(AbstractCommand.java:132)
        at org.kitesdk.morphline.base.AbstractCommand.doNotify(AbstractCommand.java:150)
        at org.kitesdk.morphline.base.AbstractCommand.notify(AbstractCommand.java:132)
        at org.kitesdk.morphline.base.Notifications.notify(Notifications.java:96)
        at org.kitesdk.morphline.base.Notifications.notifyCommitTransaction(Notifications.java:61)
        at org.apache.flume.sink.solr.morphline.MorphlineHandlerImpl.commitTransaction(MorphlineHandlerImpl.java:148)
        at org.apache.flume.sink.solr.morphline.MorphlineSink.process(MorphlineSink.java:156)
        ... 3 more
Caused by: java.lang.NoClassDefFoundError: org/apache/http/concurrent/Cancellable
        at org.apache.solr.client.solrj.impl.HttpSolrServer.createMethod(HttpSolrServer.java:380)
        at org.apache.solr.client.solrj.impl.HttpSolrServer.request(HttpSolrServer.java:210)
        at org.apache.solr.client.solrj.impl.HttpSolrServer.request(HttpSolrServer.java:206)
        at org.apache.solr.client.solrj.impl.LBHttpSolrServer.doRequest(LBHttpSolrServer.java:340)
        at org.apache.solr.client.solrj.impl.LBHttpSolrServer.request(LBHttpSolrServer.java:301)
        at org.apache.solr.client.solrj.impl.CloudSolrServer$1.call(CloudSolrServer.java:341)
        at org.apache.solr.client.solrj.impl.CloudSolrServer$1.call(CloudSolrServer.java:338)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        ... 1 more
Caused by: java.lang.ClassNotFoundException: org.apache.http.concurrent.Cancellable
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 11 more

Searching around yields that org.apache.http.concurrent.Cancellable is part of httpcore so lets add that to our parent dependency tree.

<dependency>
    <groupId>org.apache.httpcomponents</groupId>
    <artifactId>httpcore</artifactId>
    <version>4.3.1</version>
</dependency>

Build and run and it appears we have no errors. Let’s commit to Solr and see if our document is there.

curl 'localhost:8983/solr/gettingstarted/update?commit=true'
curl 'localhost:8983/solr/gettingstarted/select?q=*:*&wt=json&indent=true&rows=0'
{
  "responseHeader":{
    "status":0,
    "QTime":5,
    "params":{
      "q":"*:*",
      "indent":"true",
      "rows":"0",
      "wt":"json"}},
  "response":{"numFound":1,"start":0,"maxScore":1.0,"docs":[]
  }}

Hurrah! We have numFound equal to one. Our document is successfully loaded and we have used Flume 1.6 (with updates) to load our data into Solr 4.10.3.

So getting Flume to run Solr 4.X is not exactly easy at first glance but it works.

What happens if we try to use this setup against Solr 5.2.1? Before running 5, make sure you shutdown any other Solr 4.10.3 instances running

bin/solr stop -all

Then go ahead in the solr-5.2.1 directory and create the cloud example as we did before.

bin/solr -c -e cloud -noprompt

When we try to index, another ERROR occurs.

2016-05-24 13:54:22,748 (lifecycleSupervisor-1-1) [ERROR - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:253)] Unable to start SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@401d9cde counterGroup:{ name:null counters:{} } } - Exception follows.
org.apache.solr.common.SolrException: Plugin Initializing failure for [schema.xml] fieldType. Schema file is /tmp/1464112462569-0/conf/managed-schema
        at org.apache.solr.schema.IndexSchema.readSchema(IndexSchema.java:595)
        at org.apache.solr.schema.IndexSchema.<init>(IndexSchema.java:166)
        at org.apache.solr.schema.ManagedIndexSchema.<init>(ManagedIndexSchema.java:72)
        at org.apache.solr.schema.ManagedIndexSchemaFactory.create(ManagedIndexSchemaFactory.java:171)
        at org.apache.solr.schema.ManagedIndexSchemaFactory.create(ManagedIndexSchemaFactory.java:45)
        at org.apache.solr.schema.IndexSchemaFactory.buildIndexSchema(IndexSchemaFactory.java:69)
        at org.kitesdk.morphline.solr.SolrLocator.getIndexSchema(SolrLocator.java:181)
        at org.kitesdk.morphline.solr.SanitizeUnknownSolrFieldsBuilder$SanitizeUnknownSolrFields.<init>(SanitizeUnknownSolrFieldsBuilder.java:70)
        at org.kitesdk.morphline.solr.SanitizeUnknownSolrFieldsBuilder.build(SanitizeUnknownSolrFieldsBuilder.java:52)
        at org.kitesdk.morphline.base.AbstractCommand.buildCommand(AbstractCommand.java:302)
        at org.kitesdk.morphline.base.AbstractCommand.buildCommandChain(AbstractCommand.java:249)
        at org.kitesdk.morphline.stdlib.Pipe.<init>(Pipe.java:46)
        at org.kitesdk.morphline.stdlib.PipeBuilder.build(PipeBuilder.java:40)
        at org.kitesdk.morphline.base.Compiler.compile(Compiler.java:126)
        at org.kitesdk.morphline.base.Compiler.compile(Compiler.java:55)
        at org.apache.flume.sink.solr.morphline.MorphlineHandlerImpl.configure(MorphlineHandlerImpl.java:101)
        at org.apache.flume.sink.solr.morphline.MorphlineSink.start(MorphlineSink.java:97)
        at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:46)
        at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
        at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.solr.common.SolrException: Plugin Initializing failure for [schema.xml] fieldType
        at org.apache.solr.util.plugin.AbstractPluginLoader.load(AbstractPluginLoader.java:193)
        at org.apache.solr.schema.IndexSchema.readSchema(IndexSchema.java:486)
        ... 26 more
Caused by: org.apache.solr.common.SolrException: Must specify units="degrees" on field types with class SpatialRecursivePrefixTreeFieldType
        at org.apache.solr.schema.AbstractSpatialFieldType.init(AbstractSpatialFieldType.java:113)
        at org.apache.solr.schema.AbstractSpatialPrefixTreeFieldType.init(AbstractSpatialPrefixTreeFieldType.java:43)
        at org.apache.solr.schema.SpatialRecursivePrefixTreeFieldType.init(SpatialRecursivePrefixTreeFieldType.java:37)
        at org.apache.solr.schema.FieldType.setArgs(FieldType.java:166)
        at org.apache.solr.schema.FieldTypePluginLoader.init(FieldTypePluginLoader.java:141)
        at org.apache.solr.schema.FieldTypePluginLoader.init(FieldTypePluginLoader.java:43)
        at org.apache.solr.util.plugin.AbstractPluginLoader.load(AbstractPluginLoader.java:190)
        ... 27 more

This error is due to the sample configuration using a managed-schema in combination with the org.kitesdk.morphline.solr.SolrLocator. It just doesn’t know how to handle a managed-schema. To work around this, start a Solr Cloud example but choose the basic_config instead. Trying again, we get a very similar ERROR.

2016-05-24 14:01:37,506 (lifecycleSupervisor-1-7) [ERROR - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:253)] Unable to start SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@401d9cde counterGroup:{ name:null counters:{} } } - Exception follows.
org.apache.solr.common.SolrException: Plugin Initializing failure for [schema.xml] fieldType. Schema file is /tmp/1464112897417-0/conf/schema.xml
        at org.apache.solr.schema.IndexSchema.readSchema(IndexSchema.java:595)
        at org.apache.solr.schema.IndexSchema.<init>(IndexSchema.java:166)
        at org.apache.solr.schema.IndexSchemaFactory.create(IndexSchemaFactory.java:55)
        at org.apache.solr.schema.IndexSchemaFactory.buildIndexSchema(IndexSchemaFactory.java:69)
        at org.kitesdk.morphline.solr.SolrLocator.getIndexSchema(SolrLocator.java:181)
        at org.kitesdk.morphline.solr.SanitizeUnknownSolrFieldsBuilder$SanitizeUnknownSolrFields.<init>(SanitizeUnknownSolrFieldsBuilder.java:70)
        at org.kitesdk.morphline.solr.SanitizeUnknownSolrFieldsBuilder.build(SanitizeUnknownSolrFieldsBuilder.java:52)
        at org.kitesdk.morphline.base.AbstractCommand.buildCommand(AbstractCommand.java:302)
        at org.kitesdk.morphline.base.AbstractCommand.buildCommandChain(AbstractCommand.java:249)
        at org.kitesdk.morphline.stdlib.Pipe.<init>(Pipe.java:46)
        at org.kitesdk.morphline.stdlib.PipeBuilder.build(PipeBuilder.java:40)
        at org.kitesdk.morphline.base.Compiler.compile(Compiler.java:126)
        at org.kitesdk.morphline.base.Compiler.compile(Compiler.java:55)
        at org.apache.flume.sink.solr.morphline.MorphlineHandlerImpl.configure(MorphlineHandlerImpl.java:101)
        at org.apache.flume.sink.solr.morphline.MorphlineSink.start(MorphlineSink.java:97)
        at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:46)
        at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
        at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.solr.common.SolrException: Plugin Initializing failure for [schema.xml] fieldType
        at org.apache.solr.util.plugin.AbstractPluginLoader.load(AbstractPluginLoader.java:193)
        at org.apache.solr.schema.IndexSchema.readSchema(IndexSchema.java:486)
        ... 24 more
Caused by: org.apache.solr.common.SolrException: Must specify units="degrees" on field types with class SpatialRecursivePrefixTreeFieldType
        at org.apache.solr.schema.AbstractSpatialFieldType.init(AbstractSpatialFieldType.java:113)
        at org.apache.solr.schema.AbstractSpatialPrefixTreeFieldType.init(AbstractSpatialPrefixTreeFieldType.java:43)
        at org.apache.solr.schema.SpatialRecursivePrefixTreeFieldType.init(SpatialRecursivePrefixTreeFieldType.java:37)
        at org.apache.solr.schema.FieldType.setArgs(FieldType.java:166)
        at org.apache.solr.schema.FieldTypePluginLoader.init(FieldTypePluginLoader.java:141)
        at org.apache.solr.schema.FieldTypePluginLoader.init(FieldTypePluginLoader.java:43)
        at org.apache.solr.util.plugin.AbstractPluginLoader.load(AbstractPluginLoader.java:190)
        ... 25 more

It appears that we are hitting a wall with the org.kitesdk.morphline.solr.SolrLocator object and need to get Kite SDK updated.

UPDATING SOLR ON FLUME

Flume is well designed for modularity and we can take advantage of so good design decisions by Cloudera to update Flume to use a later Solr version. As mentioned at the beginning, Cloudera contributed solr-morphlines-core and solr-morphlines-cell to Apache Solr directly and they kept the function calls the exact same. So we can exclude the cdk-morphlines-solr-core and cdk-morphlines-solr-cell dependencies and instead use the official Solr libraries instead. KITE-999 briefly discusses this switch but doesn’t really spell it out so it is sort of that situation where to know what to do, you already have to know what to do.

So to accomplish this, lets update the flume-ng-morphline-solr-sink pom.xml to exclude the two cdk resources and instead include the solr dependencies. First we update the kite-morphlines-all to exclude these portions.

<exclusion>
  <groupId>org.kitesdk</groupId>
  <artifactId>cdk-morphlines-solr-core</artifactId>
</exclusion>
<exclusion>
  <groupId>org.kitesdk</groupId>
  <artifactId>cdk-morphlines-solr-cell</artifactId>
</exclusion>

Then we want to add the new Solr resources.

<dependency>
  <groupId>org.apache.solr</groupId>
  <artifactId>solr-morphlines-core</artifactId>
  <version>5.2.1</version>
</dependency>

<dependency>
  <groupId>org.apache.solr</groupId>
  <artifactId>solr-cell</artifactId>
  <version>5.2.1</version>
</dependency>

Finally in the parent pom.xml there is a solr-cell entry that needs to be updated to 5.2.1 as well. Because we are excluding the cdk-morphlines-solr-core the SolrLocator call ends up using org.apache.solr.morphlines.solr SolrLocator instead of org.kitesdk.morphline.solr SolrLocator. Let’s see what happens now when we use Flume on Solr 5.2.1.

2016-05-24 14:26:10,816 (lifecycleSupervisor-1-1) [INFO - org.apache.solr.core.SolrResourceLoader.<init>(SolrResourceLoader.java:138)] new SolrResourceLoader for directory: '/tmp/1464114370809-0/'
2016-05-24 14:26:11,091 (lifecycleSupervisor-1-1) [INFO - org.apache.solr.core.SolrConfig.refreshRequestParams(SolrConfig.java:927)] current version of requestparams : -1
2016-05-24 14:26:11,188 (lifecycleSupervisor-1-1) [INFO - org.apache.solr.update.SolrIndexConfig.<init>(SolrIndexConfig.java:158)] IndexWriter infoStream solr logging is enabled
2016-05-24 14:26:11,192 (lifecycleSupervisor-1-1) [INFO - org.apache.solr.core.SolrConfig.<init>(SolrConfig.java:239)] Using Lucene MatchVersion: 5.2.1
2016-05-24 14:26:11,263 (lifecycleSupervisor-1-1) [INFO - org.apache.solr.core.SolrConfig.<init>(SolrConfig.java:329)] Loaded SolrConfig: solrconfig.xml
2016-05-24 14:26:11,266 (lifecycleSupervisor-1-1) [INFO - org.apache.solr.schema.IndexSchema.readSchema(IndexSchema.java:450)] Reading Solr Schema from /tmp/1464114370809-0/conf/schema.xml
2016-05-24 14:26:11,296 (lifecycleSupervisor-1-1) [INFO - org.apache.solr.schema.IndexSchema.readSchema(IndexSchema.java:478)] [null] Schema name=example
2016-05-24 14:26:11,489 (lifecycleSupervisor-1-1) [INFO - org.apache.solr.schema.IndexSchema.readSchema(IndexSchema.java:577)] unique key field: id

Great and after testing a document ingestion, everything works as expected and we have successfully integrated Flume with Solr 5!

If you bump those depedencies to 6.0.0, Flume with work with Solr 6! Fantastic.

CONCLUSION

Wow getting projects to work together can sometimes be harder than it appears at first. At OSC we love tackling these hard problems and getting your data searchable get in touch to see how we can help today!

CODE

The preceding examples used the following config files.

You can find everything ready to compile from these sources:

Just because we got everything working doesn’t mean there aren’t some nagging things left over.

Managed Schema

I can hear you saying, “Joe what about a managed schema? You just glossed over the fact that it isn’t working!” And you are totally right and the answer is, it’s complicated. Didn’t we encounter an error when using it? Yes indeed we did but on May 29, 2014, Gregory Chanan made the commit Give SolrLocator the ability to handle managed schemas. which is included in release-1.1.0 release-1.0.0 release-0.18.0 release-0.17.1 release-0.17.0 release-0.16.0 release-0.15.0 of Kite SDK, however it only supports Solr 4 and so some of the enhancements haven’t been ported to the Solr distribution. So in this case, we are kinda stuck. Perhaps a little love will be given to the Solr release to enable the fix.

Old References in Flume Documentation

After running Flume the first time and following the documentation, I got the following error.

2016-05-24 10:55:19,330 (lifecycleSupervisor-1-0) [ERROR - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:253)] Unable to start SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@401d9cde counterGroup:{ name:null counters:{} } } - Exception follows.
org.kitesdk.morphline.api.MorphlineCompilationException: No command builder registered for name: readLine near: {
    # conf/morphline.conf: 13
    "readLine" : {
        # conf/morphline.conf: 14
        "charset" : "UTF-8"
    }
}
        at org.kitesdk.morphline.base.AbstractCommand.buildCommand(AbstractCommand.java:281)
        at org.kitesdk.morphline.base.AbstractCommand.buildCommandChain(AbstractCommand.java:249)
        at org.kitesdk.morphline.stdlib.Pipe.<init>(Pipe.java:46)
        at org.kitesdk.morphline.stdlib.PipeBuilder.build(PipeBuilder.java:40)
        at org.kitesdk.morphline.base.Compiler.compile(Compiler.java:126)
        at org.kitesdk.morphline.base.Compiler.compile(Compiler.java:55)
        at org.apache.flume.sink.solr.morphline.MorphlineHandlerImpl.configure(MorphlineHandlerImpl.java:101)
        at org.apache.flume.sink.solr.morphline.MorphlineSink.start(MorphlineSink.java:97)
        at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:46)
        at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
        at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

A Google search later, it turns out that the link in the Flume documentation leads to the Cloudera Development Kit which was renamed in 2013 to Kite SDK and the error happened because my morphlines.conf contained importCommands : ["com.cloudera.**", "org.apache.solr.**"] which was what the old docs which Flume linked to said. The project is actually using the new Kite SDK so updating the line to importCommands : ["org.kitesdk.**", "org.apache.solr.**"] fixed it up.