mercredi 15 juin 2016

org.bson.BsonSerializationException: Size 30897469 is larger than MaxDocumentSize 16777216

I am using PySpark to do word count. And the data store in MongoDB, it looks like:

> db.stackin.find({})
{ "_id" : ObjectId("575ce909aa02c3b21f1be0bb"), "summary" : "good good day", "url" : "url_1" }
{ "_id" : ObjectId("575ce909aa02c3b21f1be0bc"), "summary" : "hello world good world", "url" : "url_2" }
{ "_id" : ObjectId("575ce909aa02c3b21f1be0bd"), "summary" : "hello world good hello good", "url" : "url_3" }
{ "_id" : ObjectId("575ce909aa02c3b21f1be0be"), "summary" : "hello world hello", "url" : "url_4" }

What I want is get all word count in each url.

So, that result of above data is

{"good": [{"url_1": 2}, {"url_2": 1}, {"url_3": 2}]}
{"day: [{"url_1": 1}]}
{"hello": [{"url_2": 1}, {"url_3": 2}, {"url_4": 2}]}
{"world": [{"url_2": 2}, {"url_3": 1}, {"url_4": 1}]}

And my code:

import pyspark
import re
import collections
import pymongo_spark
pymongo_spark.activate()
rdd = sc.mongoPairRDD("mongodb://localhost/testmr.stackin")

def f(record):
    """"""
    raw_summary = record[1]['summary']
    summary = re.sub("[.!/,$%^*)(+"']+|[+——!,。?、~@#¥%……&*()]+".decode("utf8"),
                    "".decode("utf8"), raw_summary).replace("rn", " ").replace("n", " ")
    url = record[1]['url']
    _temp = dict(collections.Counter(summary.split()))
    result = [(key,{url:value}) for key,value in _temp.items()]
    return result

newrdd = rdd.flatMap(f)
sortrdd = newrdd.sortByKey()
resultrdd = sortrdd.groupByKey()
resultrdd.saveToMongoDB('mongodb://localhost:27017/testmr.stackout')

But when it save into MongoDB, it report the following error:

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 8.0 failed 1 times, most recent failure: Lost task 4.0 in stage 8.0 (TID 328, localhost): org.bson.BsonSerializationException: Size 30897469 is larger than MaxDocumentSize 16777216.
    at org.bson.BsonBinaryWriter.backpatchSize(BsonBinaryWriter.java:367)
    at org.bson.BsonBinaryWriter.doWriteEndArray(BsonBinaryWriter.java:142)
    at org.bson.AbstractBsonWriter.writeEndArray(AbstractBsonWriter.java:338)
    at com.mongodb.DBObjectCodec.encodeIterable(DBObjectCodec.java:271)
    at com.mongodb.DBObjectCodec.writeValue(DBObjectCodec.java:198)
    at com.mongodb.DBObjectCodec.encode(DBObjectCodec.java:128)
    at com.mongodb.DBObjectCodec.encode(DBObjectCodec.java:61)
    at com.mongodb.CompoundDBObjectCodec.encode(CompoundDBObjectCodec.java:48)
    at com.mongodb.CompoundDBObjectCodec.encode(CompoundDBObjectCodec.java:27)
    at org.bson.codecs.BsonDocumentWrapperCodec.encode(BsonDocumentWrapperCodec.java:63)
    at org.bson.codecs.BsonDocumentWrapperCodec.encode(BsonDocumentWrapperCodec.java:29)
    at com.mongodb.connection.RequestMessage.addDocument(RequestMessage.java:253)
    at com.mongodb.connection.RequestMessage.addCollectibleDocument(RequestMessage.java:219)
    at com.mongodb.connection.InsertMessage.encodeMessageBodyWithMetadata(InsertMessage.java:73)
    at com.mongodb.connection.RequestMessage.encodeWithMetadata(RequestMessage.java:160)
    at com.mongodb.connection.WriteProtocol.execute(WriteProtocol.java:85)
    at com.mongodb.connection.InsertProtocol.execute(InsertProtocol.java:69)
    at com.mongodb.connection.InsertProtocol.execute(InsertProtocol.java:40)
    at com.mongodb.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:159)
    at com.mongodb.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:286)
    at com.mongodb.connection.DefaultServerConnection.insert(DefaultServerConnection.java:73)
    at com.mongodb.operation.MixedBulkWriteOperation$Run$2.executeWriteProtocol(MixedBulkWriteOperation.java:450)
    at com.mongodb.operation.MixedBulkWriteOperation$Run$RunExecutor.execute(MixedBulkWriteOperation.java:655)
    at com.mongodb.operation.MixedBulkWriteOperation$Run.execute(MixedBulkWriteOperation.java:401)
    at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:179)
    at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:168)
    at com.mongodb.operation.OperationHelper.withConnectionSource(OperationHelper.java:230)
    at com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:221)
    at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:168)
    at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:74)
    at com.mongodb.Mongo.execute(Mongo.java:782)
    at com.mongodb.Mongo$2.execute(Mongo.java:765)
    at com.mongodb.DBCollection.executeBulkWriteOperation(DBCollection.java:2195)
    at com.mongodb.DBCollection.executeBulkWriteOperation(DBCollection.java:2188)
    at com.mongodb.BulkWriteOperation.execute(BulkWriteOperation.java:121)
    at com.mongodb.hadoop.output.MongoOutputCommitter.commitTask(MongoOutputCommitter.java:176)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1122)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1091)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1922)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1146)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1074)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1074)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1074)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply$mcV$sp(PairRDDFunctions.scala:994)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:985)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:985)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopFile(PairRDDFunctions.scala:985)
    at org.apache.spark.api.python.PythonRDD$.saveAsNewAPIHadoopFile(PythonRDD.scala:782)
    at org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile(PythonRDD.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.bson.BsonSerializationException: Size 30897469 is larger than MaxDocumentSize 16777216.
    at org.bson.BsonBinaryWriter.backpatchSize(BsonBinaryWriter.java:367)
    at org.bson.BsonBinaryWriter.doWriteEndArray(BsonBinaryWriter.java:142)
    at org.bson.AbstractBsonWriter.writeEndArray(AbstractBsonWriter.java:338)
    at com.mongodb.DBObjectCodec.encodeIterable(DBObjectCodec.java:271)
    at com.mongodb.DBObjectCodec.writeValue(DBObjectCodec.java:198)
    at com.mongodb.DBObjectCodec.encode(DBObjectCodec.java:128)
    at com.mongodb.DBObjectCodec.encode(DBObjectCodec.java:61)
    at com.mongodb.CompoundDBObjectCodec.encode(CompoundDBObjectCodec.java:48)
    at com.mongodb.CompoundDBObjectCodec.encode(CompoundDBObjectCodec.java:27)
    at org.bson.codecs.BsonDocumentWrapperCodec.encode(BsonDocumentWrapperCodec.java:63)
    at org.bson.codecs.BsonDocumentWrapperCodec.encode(BsonDocumentWrapperCodec.java:29)
    at com.mongodb.connection.RequestMessage.addDocument(RequestMessage.java:253)
    at com.mongodb.connection.RequestMessage.addCollectibleDocument(RequestMessage.java:219)
    at com.mongodb.connection.InsertMessage.encodeMessageBodyWithMetadata(InsertMessage.java:73)
    at com.mongodb.connection.RequestMessage.encodeWithMetadata(RequestMessage.java:160)
    at com.mongodb.connection.WriteProtocol.execute(WriteProtocol.java:85)
    at com.mongodb.connection.InsertProtocol.execute(InsertProtocol.java:69)
    at com.mongodb.connection.InsertProtocol.execute(InsertProtocol.java:40)
    at com.mongodb.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:159)
    at com.mongodb.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:286)
    at com.mongodb.connection.DefaultServerConnection.insert(DefaultServerConnection.java:73)
    at com.mongodb.operation.MixedBulkWriteOperation$Run$2.executeWriteProtocol(MixedBulkWriteOperation.java:450)
    at com.mongodb.operation.MixedBulkWriteOperation$Run$RunExecutor.execute(MixedBulkWriteOperation.java:655)
    at com.mongodb.operation.MixedBulkWriteOperation$Run.execute(MixedBulkWriteOperation.java:401)
    at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:179)
    at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:168)
    at com.mongodb.operation.OperationHelper.withConnectionSource(OperationHelper.java:230)
    at com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:221)
    at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:168)
    at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:74)
    at com.mongodb.Mongo.execute(Mongo.java:782)
    at com.mongodb.Mongo$2.execute(Mongo.java:765)
    at com.mongodb.DBCollection.executeBulkWriteOperation(DBCollection.java:2195)
    at com.mongodb.DBCollection.executeBulkWriteOperation(DBCollection.java:2188)
    at com.mongodb.BulkWriteOperation.execute(BulkWriteOperation.java:121)
    at com.mongodb.hadoop.output.MongoOutputCommitter.commitTask(MongoOutputCommitter.java:176)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1122)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1091)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    ... 1 more

My Spark ENV is standalone, not cluster. My machine has 4G RAM, 4 CPUs.

Does the error will happen in cluster environment? Or how should I fix it?

Thank you!

Aucun commentaire:

Enregistrer un commentaire