python - How to append single row to a cassandra table using pyspark? -
i want insert new row cassandra table exists. i'm using pyspark_cassandra.
spark version - 1.4.1
scala version - 2.10.6
cassandra version - 2.2.3
python version - 2.7.6
python script -
from pyspark.conf import sparkconf pyspark_cassandra import cassandrasparkcontext,row import pyspark_cassandra conf = sparkconf().setappname("pyspark cassandra test").setmaster("spark://0.0.0.0:7077").set("spark.cassandra.connection.host","http://127.0.0.1") sc = cassandrasparkcontext(conf=conf) rdd = sc.parallelize([{ "id": 101, "name": "ketan", } ]) rdd.savetocassandra("users","temp")
command run above script -
sudo /usr/local/spark/bin/spark-submit --jars /path/to/pyspark-cassandra-assembly-<version>.jar \ --driver-class-path /path/to/pyspark-cassandra-assembly-<version>.jar \ --py-files /path/to/pyspark-cassandra-assembly-<version>.jar \ --conf spark.cassandra.connection.host=your,cassandra,node,names \ --master spark://spark-master:7077 \ yourscript.py
i got stuck because of below error -
traceback (most recent call last): file "/home/yourscript.py", line 13, in <module> rdd.savetocassandra("users","temp") file "/usr/local/spark/pyspark-cassandra-master/target/scala-2.10/pyspark-cassandra-assembly-0.3.5.jar/pyspark_cassandra/rdd.py", line 83, in savetocassandra file "/usr/local/spark/pyspark-cassandra-master/target/scala-2.10/pyspark-cassandra-assembly-0.3.5.jar/pyspark_cassandra/util.py", line 93, in helper file "/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__ file "/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value py4j.protocol.py4jjavaerror: error occurred while calling o26.newinstance. : java.lang.nosuchmethoderror: scala.reflect.api.javauniverse.runtimemirror(ljava/lang/classloader;)lscala/reflect/api/javamirrors$javamirror; @ com.datastax.spark.connector.types.typeconverter$.<init>(typeconverter.scala:116) @ com.datastax.spark.connector.types.typeconverter$.<clinit>(typeconverter.scala) @ pyspark_cassandra.pythonhelper.<init>(pythonhelper.scala:36) @ sun.reflect.nativeconstructoraccessorimpl.newinstance0(native method) @ sun.reflect.nativeconstructoraccessorimpl.newinstance(nativeconstructoraccessorimpl.java:62) @ sun.reflect.delegatingconstructoraccessorimpl.newinstance(delegatingconstructoraccessorimpl.java:45) @ java.lang.reflect.constructor.newinstance(constructor.java:423) @ java.lang.class.newinstance(class.java:442) @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:62) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) @ java.lang.reflect.method.invoke(method.java:498) @ py4j.reflection.methodinvoker.invoke(methodinvoker.java:244) @ py4j.reflection.reflectionengine.invoke(reflectionengine.java:357) @ py4j.gateway.invoke(gateway.java:280) @ py4j.commands.abstractcommand.invokemethod(abstractcommand.java:132) @ py4j.commands.callcommand.execute(callcommand.java:79) @ py4j.gatewayconnection.run(gatewayconnection.java:214) @ java.lang.thread.run(thread.java:745) 17/04/06 15:15:33 info sparkcontext: invoking stop() shutdown hook 17/04/06 15:15:33 info sparkui: stopped spark web ui @ http://192.168.195.119:4040 17/04/06 15:15:33 info mapoutputtrackermasterendpoint: mapoutputtrackermasterendpoint stopped! 17/04/06 15:15:33 info memorystore: memorystore cleared 17/04/06 15:15:33 info blockmanager: blockmanager stopped 17/04/06 15:15:33 info blockmanagermaster: blockmanagermaster stopped 17/04/06 15:15:33 info outputcommitcoordinator$outputcommitcoordinatorendpoint: outputcommitcoordinator stopped! 17/04/06 15:15:33 info sparkcontext: stopped sparkcontext 17/04/06 15:15:33 info shutdownhookmanager: shutdown hook called 17/04/06 15:15:33 info shutdownhookmanager: deleting directory /tmp/spark-3d6cd846-4e37-47d4-b1e0-d9e68a7d34b3/pyspark-c2b7061a-44f2-4e1c-bf90-ea30b81c3c7d 17/04/06 15:15:33 info shutdownhookmanager: deleting directory /tmp/spark-3d6cd846-4e37-47d4-b1e0-d9e68a7d34b3
i tried execute above code on pyspark shell , working fine me. data inserted cassandra table while doing in python script stuck @ point.
am missing more configuration or else?
this scala version mismatch error. components using scala 2.10 , others using 2.11.
see spark cassandra connector faq more details.
the solution make sure of libraries using same version of scala.
Comments
Post a Comment