amazon s3 - Why can't load parquet file into spark dataframe? -


df = spark.read.load("s3://s3-cdp-prod-hive/novaya/impliciy_score_matrix/") 

i try load parquet files s3 hdfs. give me error below:

traceback (most recent call last):   file "/tmp/zeppelin_pyspark-486331500042535422.py", line 344, in <module>     raise exception(traceback.format_exc()) exception: traceback (most recent call last):   file "/tmp/zeppelin_pyspark-486331500042535422.py", line 337, in <module>     exec(code)   file "<stdin>", line 4, in <module>   file "/usr/lib/spark/python/pyspark/sql/readwriter.py", line 149, in load     return self._df(self._jreader.load(path))   file "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__     answer, self.gateway_client, self.target_id, self.name)   file "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco     return f(*a, **kw)   file "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value     format(target_id, ".", name), value) py4jjavaerror: error occurred while calling o73.load. : com.amazon.ws.emr.hadoop.fs.consistency.exception.consistencyexception: 3 items inconsistent (no s3 object associated metadata item). first object: /s3-cdp-prod-hive/novaya/impliciy_score_matrix/.hive-staging_hive_2017-03-30_15-12-27_966_3482598128600403763-456     @ com.amazon.ws.emr.hadoop.fs.consistency.consistencycheckers3filesystem.liststatus(consistencycheckers3filesystem.java:727)     @ com.amazon.ws.emr.hadoop.fs.consistency.consistencycheckers3filesystem.liststatus(consistencycheckers3filesystem.java:497)     @ sun.reflect.nativemethodaccessorimpl.invoke0(native method)     @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:62)     @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43)     @ java.lang.reflect.method.invoke(method.java:498)     @ org.apache.hadoop.io.retry.retryinvocationhandler.invokemethod(retryinvocationhandler.java:191)     @ org.apache.hadoop.io.retry.retryinvocationhandler.invoke(retryinvocationhandler.java:102)     @ com.sun.proxy.$proxy43.liststatus(unknown source)     @ com.amazon.ws.emr.hadoop.fs.s3n2.s3nativefilesystem2.liststatus(s3nativefilesystem2.java:192)     @ com.amazon.ws.emr.hadoop.fs.emrfilesystem.liststatus(emrfilesystem.java:339)     @ org.apache.spark.sql.execution.datasources.partitioningawarefileindex$.org$apache$spark$sql$execution$datasources$partitioningawarefileindex$$listleaffiles(partitioningawarefileindex.scala:392)     @ org.apache.spark.sql.execution.datasources.partitioningawarefileindex$$anonfun$org$apache$spark$sql$execution$datasources$partitioningawarefileindex$$bulklistleaffiles$1.apply(partitioningawarefileindex.scala:302)     @ org.apache.spark.sql.execution.datasources.partitioningawarefileindex$$anonfun$org$apache$spark$sql$execution$datasources$partitioningawarefileindex$$bulklistleaffiles$1.apply(partitioningawarefileindex.scala:301)     @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:234)     @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:234)     @ scala.collection.mutable.resizablearray$class.foreach(resizablearray.scala:59)     @ scala.collection.mutable.arraybuffer.foreach(arraybuffer.scala:48)     @ scala.collection.traversablelike$class.map(traversablelike.scala:234)     @ scala.collection.abstracttraversable.map(traversable.scala:104)     @ org.apache.spark.sql.execution.datasources.partitioningawarefileindex$.org$apache$spark$sql$execution$datasources$partitioningawarefileindex$$bulklistleaffiles(partitioningawarefileindex.scala:301)     @ org.apache.spark.sql.execution.datasources.partitioningawarefileindex.listleaffiles(partitioningawarefileindex.scala:253)     @ org.apache.spark.sql.execution.datasources.inmemoryfileindex.refresh0(inmemoryfileindex.scala:74)     @ org.apache.spark.sql.execution.datasources.inmemoryfileindex.<init>(inmemoryfileindex.scala:50)     @ org.apache.spark.sql.execution.datasources.datasource.tempfileindex$lzycompute$1(datasource.scala:133)     @ org.apache.spark.sql.execution.datasources.datasource.org$apache$spark$sql$execution$datasources$datasource$$tempfileindex$1(datasource.scala:124)     @ org.apache.spark.sql.execution.datasources.datasource.org$apache$spark$sql$execution$datasources$datasource$$getorinferfileformatschema(datasource.scala:138)     @ org.apache.spark.sql.execution.datasources.datasource.resolverelation(datasource.scala:387)     @ org.apache.spark.sql.dataframereader.load(dataframereader.scala:152)     @ org.apache.spark.sql.dataframereader.load(dataframereader.scala:135)     @ sun.reflect.nativemethodaccessorimpl.invoke0(native method)     @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:62)     @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43)     @ java.lang.reflect.method.invoke(method.java:498)     @ py4j.reflection.methodinvoker.invoke(methodinvoker.java:244)     @ py4j.reflection.reflectionengine.invoke(reflectionengine.java:357)     @ py4j.gateway.invoke(gateway.java:280)     @ py4j.commands.abstractcommand.invokemethod(abstractcommand.java:132)     @ py4j.commands.callcommand.execute(callcommand.java:79)     @ py4j.gatewayconnection.run(gatewayconnection.java:214)     @ java.lang.thread.run(thread.java:745) error    took 8 min 39 sec. last updated yanan.chen @ april 06 2017, 10:57:48 am. (outdated)   %pyspark df.show()  traceback (most recent call last):   file "/tmp/zeppelin_pyspark-1196174262473053317.py", line 344, in <module>     raise exception(traceback.format_exc()) exception: traceback (most recent call last):   file "/tmp/zeppelin_pyspark-1196174262473053317.py", line 337, in <module>     exec(code)   file "<stdin>", line 1, in <module> nameerror: name 'df' not defined error    took 9 min 28 sec. last updated taoyang1 @ april 06 2017, 6:55:45 am.   %pyspark als = als(maxiter=20, rank=10, regparam=0.01, usercol="member_srl", itemcol="productid", ratingcol="rating", seed=123) #(training, test) = df.randomsplit([0.8, 0.2]) #model = als.fit(training) model = als.fit(df) finished    took 2 min 12 sec. last updated yanan.chen @ march 30 2017, 1:39:34 pm.   %pyspark pred = model.transform(df) finished    took 0 sec. last updated yanan.chen @ march 30 2017, 1:40:18 pm.   %pyspark pred.count()  traceback (most recent call last):   file "/tmp/zeppelin_pyspark-1727164243295023279.py", line 344, in <module>     raise exception(traceback.format_exc()) exception: traceback (most recent call last):   file "/tmp/zeppelin_pyspark-1727164243295023279.py", line 337, in <module>     exec(code)   file "<stdin>", line 1, in <module>   file "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 380, in count     return int(self._jdf.count())   file "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__     answer, self.gateway_client, self.target_id, self.name)   file "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco     return f(*a, **kw)   file "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value     format(target_id, ".", name), value) py4jjavaerror: error occurred while calling o416.count. : org.apache.spark.sparkexception: job 19 cancelled because sparkcontext shut down     @ org.apache.spark.scheduler.dagscheduler$$anonfun$cleanupafterschedulerstop$1.apply(dagscheduler.scala:808)     @ org.apache.spark.scheduler.dagscheduler$$anonfun$cleanupafterschedulerstop$1.apply(dagscheduler.scala:806)     @ scala.collection.mutable.hashset.foreach(hashset.scala:78)     @ org.apache.spark.scheduler.dagscheduler.cleanupafterschedulerstop(dagscheduler.scala:806)     @ org.apache.spark.scheduler.dagschedulereventprocessloop.onstop(dagscheduler.scala:1668)     @ org.apache.spark.util.eventloop.stop(eventloop.scala:83)     @ org.apache.spark.scheduler.dagscheduler.stop(dagscheduler.scala:1587)     @ org.apache.spark.sparkcontext$$anonfun$stop$8.apply$mcv$sp(sparkcontext.scala:1826)     @ org.apache.spark.util.utils$.trylognonfatalerror(utils.scala:1283)     @ org.apache.spark.sparkcontext.stop(sparkcontext.scala:1825)     @ org.apache.spark.sparkcontext$$anon$3.run(sparkcontext.scala:1770)     @ org.apache.spark.scheduler.dagscheduler.runjob(dagscheduler.scala:628)     @ org.apache.spark.sparkcontext.runjob(sparkcontext.scala:1918)     @ org.apache.spark.sparkcontext.runjob(sparkcontext.scala:1931)     @ org.apache.spark.sparkcontext.runjob(sparkcontext.scala:1944)     @ org.apache.spark.sparkcontext.runjob(sparkcontext.scala:1958)     @ org.apache.spark.rdd.rdd$$anonfun$collect$1.apply(rdd.scala:935)     @ org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:151)     @ org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:112)     @ org.apache.spark.rdd.rdd.withscope(rdd.scala:362)     @ org.apache.spark.rdd.rdd.collect(rdd.scala:934)     @ org.apache.spark.sql.execution.sparkplan.executecollect(sparkplan.scala:275)     @ org.apache.spark.sql.dataset$$anonfun$org$apache$spark$sql$dataset$$execute$1$1.apply(dataset.scala:2371)     @ org.apache.spark.sql.execution.sqlexecution$.withnewexecutionid(sqlexecution.scala:57)     @ org.apache.spark.sql.dataset.withnewexecutionid(dataset.scala:2765)     @ org.apache.spark.sql.dataset.org$apache$spark$sql$dataset$$execute$1(dataset.scala:2370)     @ org.apache.spark.sql.dataset.org$apache$spark$sql$dataset$$collect(dataset.scala:2377)     @ org.apache.spark.sql.dataset$$anonfun$count$1.apply(dataset.scala:2405)     @ org.apache.spark.sql.dataset$$anonfun$count$1.apply(dataset.scala:2404)     @ org.apache.spark.sql.dataset.withcallback(dataset.scala:2778)     @ org.apache.spark.sql.dataset.count(dataset.scala:2404)     @ sun.reflect.nativemethodaccessorimpl.invoke0(native method)     @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:62)     @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43)     @ java.lang.reflect.method.invoke(method.java:498)     @ py4j.reflection.methodinvoker.invoke(methodinvoker.java:244)     @ py4j.reflection.reflectionengine.invoke(reflectionengine.java:357)     @ py4j.gateway.invoke(gateway.java:280)     @ py4j.commands.abstractcommand.invokemethod(abstractcommand.java:132)     @ py4j.commands.callcommand.execute(callcommand.java:79)     @ py4j.gatewayconnection.run(gatewayconnection.java:214)     @ java.lang.thread.run(thread.java:745) 

i have load table stored parquet not large one. load failure due the huge size of table has 200 million rows. know what's wrong ? how solve it.

you can try increasing number of executors , executor memmory. can pyspark --num-executors 4 --executor-memory 4g


Comments