java - Spark with Cassandra input/output -


picture following senario: spark application (java implementation) using cassandra database load, convert rdd , process data. application steaming new data database processed custom receiver. output of streaming process stored in database. implementation using spring data cassandra integration database.

cassandraconfig:

@configuration @componentscan(basepackages = {"org.foo"}) @propertysource(value = { "classpath:cassandra.properties" }) public class cassandraconfig {      @autowired     private environment env;      @bean     public cassandraclusterfactorybean cluster() {         cassandraclusterfactorybean cluster = new cassandraclusterfactorybean();         cluster.setcontactpoints(env.getproperty("cassandra.contactpoints"));         cluster.setport(integer.parseint(env.getproperty("cassandra.port")));          return cluster;     }      @bean     public cassandramappingcontext mappingcontext() {         return new basiccassandramappingcontext();     }      @bean     public cassandraconverter converter() {         return new mappingcassandraconverter(mappingcontext());     }      @bean     public cassandrasessionfactorybean session() throws exception {         cassandrasessionfactorybean session = new cassandrasessionfactorybean();         session.setcluster(cluster().getobject());         session.setkeyspacename(env.getproperty("cassandra.keyspace"));         session.setconverter(converter());         session.setschemaaction(schemaaction.none);          return session;     }      @bean     public cassandraoperations cassandratemplate() throws exception {         return new cassandratemplate(session().getobject());     }  } 

dataprocessor.main method:

// initialize spring application context applicationcontext applicationcontext = new annotationconfigapplicationcontext(cassandraconfig.class); applicationcontextholder.setapplicationcontext(applicationcontext); cassandraoperations cassandraoperations = applicationcontext.getbean(cassandraoperations.class); // initialize spark context sparkconf conf = new sparkconf().setappname("test-spark").setmaster("local[2]"); javasparkcontext sc = new javasparkcontext(conf);  // load data pages list<event> pagingresults = cassandraoperations.select("select * event event_type = 'event_type1' order creation_time desc limit " + data_page_size, event.class); // parallelize first page javardd<event> rddbuffer = sc.parallelize(pagingresults);  while(pagingresults != null && !pagingresults.isempty()) {     event lastevent = pagingresults.get(pagingresults.size() - 1);     pagingresults = cassandraoperations.select("select * event event_type = 'event_type1' , creation_time < " + lastevent.getpk().getcreationtime() + " order creation_time desc limit " + data_page_size, event.class);     // parallelize page , add existing     rddbuffer = rddbuffer.union(sc.parallelize(pagingresults)); }  // data processing ... 

it expected have big amount of data initial loading. reason data paginated, loaded , distributed in rddbuffer.

there following options available:

  1. the spark-cassandra example (https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/cassandracqltest.scala), although there minimum amount of documentation example.
  2. the calliope project (http://tuplejump.github.io/calliope/)

i know best practice integration of spark cassandra. best option follow in implementation?

apache spark 1.0.0, apache cassandra 2.0.8

the easiest way work cassandra , spark use official open source cassandra driver spark developed datastax: https://github.com/datastax/spark-cassandra-connector

this driver has been built on top of cassandra java driver , provides direct bridge between cassandra , spark. unlike calliope, not use hadoop interface. additionally offers following unique features:

  • support cassandra data types, including collections, out of box
  • lightweight mapping of cassandra rows custom classes or tuples without need use implicits or other advanced features in scala
  • saving rdds cassandra
  • full support cassandra virtual nodes
  • ability filter / select on server side, e.g. leveraging cassandra clustering columns or secondary indexes

Comments

Popular posts from this blog

javascript - RequestAnimationFrame not working when exiting fullscreen switching space on Safari -

Python ctypes access violation with const pointer arguments -