java - Spark with Cassandra input/output -
picture following senario: spark application (java implementation) using cassandra database load, convert rdd , process data. application steaming new data database processed custom receiver. output of streaming process stored in database. implementation using spring data cassandra integration database.
cassandraconfig:
@configuration @componentscan(basepackages = {"org.foo"}) @propertysource(value = { "classpath:cassandra.properties" }) public class cassandraconfig { @autowired private environment env; @bean public cassandraclusterfactorybean cluster() { cassandraclusterfactorybean cluster = new cassandraclusterfactorybean(); cluster.setcontactpoints(env.getproperty("cassandra.contactpoints")); cluster.setport(integer.parseint(env.getproperty("cassandra.port"))); return cluster; } @bean public cassandramappingcontext mappingcontext() { return new basiccassandramappingcontext(); } @bean public cassandraconverter converter() { return new mappingcassandraconverter(mappingcontext()); } @bean public cassandrasessionfactorybean session() throws exception { cassandrasessionfactorybean session = new cassandrasessionfactorybean(); session.setcluster(cluster().getobject()); session.setkeyspacename(env.getproperty("cassandra.keyspace")); session.setconverter(converter()); session.setschemaaction(schemaaction.none); return session; } @bean public cassandraoperations cassandratemplate() throws exception { return new cassandratemplate(session().getobject()); } }
dataprocessor.main method:
// initialize spring application context applicationcontext applicationcontext = new annotationconfigapplicationcontext(cassandraconfig.class); applicationcontextholder.setapplicationcontext(applicationcontext); cassandraoperations cassandraoperations = applicationcontext.getbean(cassandraoperations.class); // initialize spark context sparkconf conf = new sparkconf().setappname("test-spark").setmaster("local[2]"); javasparkcontext sc = new javasparkcontext(conf); // load data pages list<event> pagingresults = cassandraoperations.select("select * event event_type = 'event_type1' order creation_time desc limit " + data_page_size, event.class); // parallelize first page javardd<event> rddbuffer = sc.parallelize(pagingresults); while(pagingresults != null && !pagingresults.isempty()) { event lastevent = pagingresults.get(pagingresults.size() - 1); pagingresults = cassandraoperations.select("select * event event_type = 'event_type1' , creation_time < " + lastevent.getpk().getcreationtime() + " order creation_time desc limit " + data_page_size, event.class); // parallelize page , add existing rddbuffer = rddbuffer.union(sc.parallelize(pagingresults)); } // data processing ...
it expected have big amount of data initial loading. reason data paginated, loaded , distributed in rddbuffer.
there following options available:
- the spark-cassandra example (https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/cassandracqltest.scala), although there minimum amount of documentation example.
- the calliope project (http://tuplejump.github.io/calliope/)
i know best practice integration of spark cassandra. best option follow in implementation?
apache spark 1.0.0, apache cassandra 2.0.8
the easiest way work cassandra , spark use official open source cassandra driver spark developed datastax: https://github.com/datastax/spark-cassandra-connector
this driver has been built on top of cassandra java driver , provides direct bridge between cassandra , spark. unlike calliope, not use hadoop interface. additionally offers following unique features:
- support cassandra data types, including collections, out of box
- lightweight mapping of cassandra rows custom classes or tuples without need use implicits or other advanced features in scala
- saving rdds cassandra
- full support cassandra virtual nodes
- ability filter / select on server side, e.g. leveraging cassandra clustering columns or secondary indexes
Comments
Post a Comment