Friday, May 25, 2012

Flume-Solr Integration

Integrate Flume with Solr. I have created a new sink. This sink is usually used with the regexAll decorators that perform light transformation of event data into attributes. This attributes are converted into solr document and commited in solr.

What is Solr
Solr is an open source enterprise search server based on Lucene. Solr is written in Java and runs as a standalone full-text search server within a servlet container such as Tomcat. Solr uses the Lucene Java search library at its core for full-text indexing and search, and has REST-like HTTP/XML and JSON APIs that make it easy to use from virtually any programming language.

What is Flume
Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming
data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. The system is centrally managed and allows for intelligent dynamic management.

I have used flume-0.9.3 and apache-solr-3.1.0 for this POC.

RegexAllExtractor decorator prepare events that contain attributes ready to be written into an Solr. Implementing a RegexAllExtractor decorator is very simple.

package com.cloudera.flume.core.extractors;

import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.ArrayList;
import java.util.List;

import com.cloudera.flume.conf.Context;
import com.cloudera.flume.conf.SinkFactory.SinkBuilder;
import com.cloudera.flume.conf.SinkFactory.SinkDecoBuilder;
import com.cloudera.flume.core.Attributes;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.core.EventSinkDecorator;
import com.cloudera.util.Pair;

 * This takes a regex and any number of attribute names to assign to each
 * sub pattern in pattern order
 * Example 1:
 *   regexAll("(\d+):(\d+):(\d+)", "s1, s2, s3")
 *   "123:456:789" -> {s1:123, s2:456, s3:789}
 * Example 2:
 *   regexAll("(\d+):(\d+):(\d+)", "s1, s2")
 *   "123:456:789" -> {s1:123, s2:456}
 * Example 3:
 *   regexAll("(\d+):(\d+):(\d+)", "s1, , s2")
 *   "123:456:789" -> {s1:123, s2:789}
public class RegexAllExtractor extends EventSinkDecorator<EventSink> {
  final Pattern pat;
  final List<String> names;

   * This will not thrown an exception
  public RegexAllExtractor(EventSink snk, Pattern pat, List<String> names) {
    this.pat = pat;
    this.names = names;

   * Convenience constructor that may throw a PatternSyntaxException (runtime
   * exn).
  public RegexAllExtractor(EventSink snk, String regex, List<String> names) {
    this(snk, Pattern.compile(regex), names);

  public void append(Event e) throws IOException, InterruptedException {
    String s = new String(e.getBody());
    Matcher m = pat.matcher(s);
    String val = "";
    Integer grpCnt = m.groupCount();

      for(int grp = 1; grp <= grpCnt; grp++){
        val = "";
        try {
          val =;
        } catch (IndexOutOfBoundsException ioobe) {
          val = "";

        //Try/Catch so that we don't require there be the same number of names as patterns.
        try {
          //Ignore blank names. These are most likely sub patterns we don't care about keeping.
          if (!"".equals(names.get(grp-1))) {
            Attributes.setString(e, names.get(grp-1), val);
        } catch (IndexOutOfBoundsException ioobe) {

  public static SinkDecoBuilder builder() {
    return new SinkDecoBuilder() {
      public EventSinkDecorator<EventSink> build(Context context, String... argv) {
        Preconditions.checkArgument(argv.length == 2, "usage: regexAll(\"regex\", \"col1,col2\")");
        String regex = argv[0];
        Pattern pat = Pattern.compile(regex);
        String name = argv[1];
        ArrayList<String> nameList = new ArrayList<String>();
        String[] names = name.split("\\,");
        for(int i=0; i<names.length; ++i){

        EventSinkDecorator<EventSink> snk = new RegexAllExtractor(null, pat, nameList);
        return snk;

  * This is a special function used by the SourceFactory to pull in this class
  * as a plugin decorator.
      public static List<Pair<String, SinkDecoBuilder>> getDecoratorBuilders() {
          List<Pair<String, SinkDecoBuilder>> builders =
                  new ArrayList<Pair<String, SinkDecoBuilder>>();
          builders.add(new Pair<String, SinkDecoBuilder>("regexAll",
          return builders;

Flume-Solr sink commit a single records into an Solr server based on a single Flume event. This sink have one input attribute: url. The attribute url is the url of the output Solr server. Yes, that means currently one sink can be configured to output into just one Solr server. Implementing a SolrEventSink sink is very simple.

package com.cloudera.flume.handlers.solr;
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;

import org.apache.hadoop.hbase.util.Bytes;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;

import com.cloudera.flume.conf.Context;
import com.cloudera.flume.conf.SinkFactory.SinkBuilder;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventSink;
import org.slf4j.Logger;
import com.cloudera.util.*;
import org.slf4j.LoggerFactory;

* This class will convert attributes added into Flume event from RegexAllExtractor class into solr document and 
* store into solr.
public class SolrEventSink extends EventSink.Base {

 static final Logger log = LoggerFactory.getLogger(SolrEventSink.class);
 private CommonsHttpSolrServer server;
 private String solrURL;

  public SolrEventSink(String url) {
   this.solrURL = url;"solr server url"+solrURL);
   try {
  this.server = new CommonsHttpSolrServer(solrURL);
 } catch (MalformedURLException e) {
  log.error("Invalid Solr URL");
  public void append(Event e) throws IOException {
/* if(e.getAttrs().get((Object)"UnUsedField")==null)
*/  // generate solr document.
  SolrInputDocument document = new SolrInputDocument();
  for (Entry<String, byte[]> a : e.getAttrs().entrySet()) {
   if(!a.getKey().equals("AckType") && !a.getKey().equals("AckTag") && !a.getKey().equals("AckChecksum") && !a.getKey().equals("tailSrcFile"))
    document.addField(a.getKey(), Bytes.toString(a.getValue()));  
    try {
     // Add documnet into solr.
      // Commit the data into solr.
 } catch (SolrServerException e1) {
  log.error("SolrServerException : Exception communicating to the Solr Server instance");
 }catch (SolrException exception) {
  log.error("Solr Exception: "+ exception);
 }catch (Exception exception) {
  log.error("Exception : "+ exception);

  public static SinkBuilder builder() {
    return new SinkBuilder() {

      public EventSink build(Context context, String... argv) {
        Preconditions.checkArgument(argv.length == 1,
            "usage: SolrEventSink(serverURL)");

        return new SolrEventSink(argv[0]);

  * This is a special function used by the SourceFactory to pull in this class
  * as a plugin sink.
    public static List<Pair<String, SinkBuilder>> getSinkBuilders() {
      List<Pair<String, SinkBuilder>> builders =
        new ArrayList<Pair<String, SinkBuilder>>();
      builders.add(new Pair<String, SinkBuilder>("solrEventSink", builder()));
      return builders;

You need to add flume-plugin-solrEventSink and flume-plugin-regexAllExtractor jar into Flume lib dir. You can compile it from Flume sources and Solr jars, then you need to add following jars into Flume's lib dir.

Jar names : 

The last step is to add plugins to Flume configuration(flume-conf.xml) file .


Run Solr server: 
java -Dsolr.solr.home=path/to/solr_home/solr/  -Djetty.port=12000 -jar start.jar

Note: Solr schema.xml file must contain the entries of column names specified in configuration of regexAll decorator in field tag. Otherwise invalid column name error ocurred and document not saved in solr.

configuration of Flume agent:
source : tail("path/to/log/file")
sink :  agentSink("collector_machine_name",35853)

configuration of Flume collector:
source : collectorSource(35853)
sink : {regexAll("regex","column_names") => solrEventSink("solr_server_url")}


Run Solr server :
    $ cd SOLR_HOME
    $ java -Dsolr.solr.home=path/to/SOLR_HOME/solr/  -Djetty.port=12000 -jar start.jar

Create a file test.log and add following lines:

Run Flume master:

    $ cd FLUME_HOME
    $ bin/flume master
Run Flume agent:
    $ cd FLUME_HOME
    $ bin/flume node -n test_agent
Run Flume collector:
    $ cd FLUME_HOME 
    $ bin/flume node -n test_collector

Configuration of Flume agent:
    source : tail("/path/to/test.log")
    sink :  agentSink("localhost",35853) 

Configuration of Flume collector:
    source : collectorSource(35853)
    sink : {regexAll("([^,]*),([^,]*),([^,]*)","id,cat,name")=>solrEventSink("http://hadoop-namenode:12000/solr")}

Note : Solr schema.xml must contains the entries of id,cat,name in field tag. Otherwise invalid column name error ocurred and document not saved in Solr. 

Reference: Flume-Solr Integration From our Contributors Ankit Jain at Ankit Jain's blog

No comments:

Post a Comment