python - How to append single row to a cassandra table using pyspark? -


i want insert new row cassandra table exists. i'm using pyspark_cassandra.

spark version - 1.4.1

scala version - 2.10.6

cassandra version - 2.2.3

python version - 2.7.6

python script -

from pyspark.conf import sparkconf pyspark_cassandra import cassandrasparkcontext,row import pyspark_cassandra  conf = sparkconf().setappname("pyspark cassandra test").setmaster("spark://0.0.0.0:7077").set("spark.cassandra.connection.host","http://127.0.0.1")  sc = cassandrasparkcontext(conf=conf) rdd = sc.parallelize([{     "id": 101,     "name": "ketan",     } ])  rdd.savetocassandra("users","temp") 

command run above script -

sudo /usr/local/spark/bin/spark-submit --jars /path/to/pyspark-cassandra-assembly-<version>.jar \ --driver-class-path /path/to/pyspark-cassandra-assembly-<version>.jar \ --py-files /path/to/pyspark-cassandra-assembly-<version>.jar \ --conf spark.cassandra.connection.host=your,cassandra,node,names \ --master spark://spark-master:7077 \ yourscript.py 

i got stuck because of below error -

traceback (most recent call last):   file "/home/yourscript.py", line 13, in <module>     rdd.savetocassandra("users","temp")   file "/usr/local/spark/pyspark-cassandra-master/target/scala-2.10/pyspark-cassandra-assembly-0.3.5.jar/pyspark_cassandra/rdd.py", line 83, in savetocassandra   file "/usr/local/spark/pyspark-cassandra-master/target/scala-2.10/pyspark-cassandra-assembly-0.3.5.jar/pyspark_cassandra/util.py", line 93, in helper   file "/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__   file "/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value py4j.protocol.py4jjavaerror: error occurred while calling o26.newinstance. : java.lang.nosuchmethoderror: scala.reflect.api.javauniverse.runtimemirror(ljava/lang/classloader;)lscala/reflect/api/javamirrors$javamirror;     @ com.datastax.spark.connector.types.typeconverter$.<init>(typeconverter.scala:116)     @ com.datastax.spark.connector.types.typeconverter$.<clinit>(typeconverter.scala)     @ pyspark_cassandra.pythonhelper.<init>(pythonhelper.scala:36)     @ sun.reflect.nativeconstructoraccessorimpl.newinstance0(native method)     @ sun.reflect.nativeconstructoraccessorimpl.newinstance(nativeconstructoraccessorimpl.java:62)     @ sun.reflect.delegatingconstructoraccessorimpl.newinstance(delegatingconstructoraccessorimpl.java:45)     @ java.lang.reflect.constructor.newinstance(constructor.java:423)     @ java.lang.class.newinstance(class.java:442)     @ sun.reflect.nativemethodaccessorimpl.invoke0(native method)     @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:62)     @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43)     @ java.lang.reflect.method.invoke(method.java:498)     @ py4j.reflection.methodinvoker.invoke(methodinvoker.java:244)     @ py4j.reflection.reflectionengine.invoke(reflectionengine.java:357)     @ py4j.gateway.invoke(gateway.java:280)     @ py4j.commands.abstractcommand.invokemethod(abstractcommand.java:132)     @ py4j.commands.callcommand.execute(callcommand.java:79)     @ py4j.gatewayconnection.run(gatewayconnection.java:214)     @ java.lang.thread.run(thread.java:745)  17/04/06 15:15:33 info sparkcontext: invoking stop() shutdown hook 17/04/06 15:15:33 info sparkui: stopped spark web ui @ http://192.168.195.119:4040 17/04/06 15:15:33 info mapoutputtrackermasterendpoint: mapoutputtrackermasterendpoint stopped! 17/04/06 15:15:33 info memorystore: memorystore cleared 17/04/06 15:15:33 info blockmanager: blockmanager stopped 17/04/06 15:15:33 info blockmanagermaster: blockmanagermaster stopped 17/04/06 15:15:33 info outputcommitcoordinator$outputcommitcoordinatorendpoint: outputcommitcoordinator stopped! 17/04/06 15:15:33 info sparkcontext: stopped sparkcontext 17/04/06 15:15:33 info shutdownhookmanager: shutdown hook called 17/04/06 15:15:33 info shutdownhookmanager: deleting directory /tmp/spark-3d6cd846-4e37-47d4-b1e0-d9e68a7d34b3/pyspark-c2b7061a-44f2-4e1c-bf90-ea30b81c3c7d 17/04/06 15:15:33 info shutdownhookmanager: deleting directory /tmp/spark-3d6cd846-4e37-47d4-b1e0-d9e68a7d34b3 

i tried execute above code on pyspark shell , working fine me. data inserted cassandra table while doing in python script stuck @ point.

am missing more configuration or else?

this scala version mismatch error. components using scala 2.10 , others using 2.11.

see spark cassandra connector faq more details.

the solution make sure of libraries using same version of scala.


Comments

Popular posts from this blog

ios - Change Storyboard View using Seague -

commonjs - How to write a typescript definition file for a node module that exports a function? -

openid - Okta: Failed to get authorization code through API call -