![]() Looking at an executor log you can seeġ9/12/26 09:05:40 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)ġ9/12/26 09:05:43 INFO S3NativeFileSystem: DirectFileOutputCommitterĪnd means that all tasks write their outputs directly to the task output directory. When loading into a Hive table from Spark this option is taken from /etc/hadoop/conf/mapred-site.xml configuration file: ![]() Task commit is a process when a task makes its result visible to the job driver. Let’s consider 2 phases of job execution – Task and Job commit that can help us understand how they affect the performance of a Spark job. In my case the rename of 2,126 files (~ 2 TB) took 3 hours 9 minutes (5.3 seconds per file, or 182.4 MB/sec on average). A file rename is quite long operation in S3 since it requires to move and delete the file so this time is proportional to the file size. It is a sequential process performed by the Spark driver that renames files one by one. ![]() Output files generated by the Spark tasks are moved from the staging directory into the final destination. After that the Spark driver started renaming the files:ġ9/12/26 09:43:53 INFO FileOutputCommitter: File Output Committer Algorithm version is 1ġ9/12/26 09:43:54 INFO S3NativeFileSystem: rename You can see that it took just 38 minutes to complete all 2,126 tasks by Spark. INSERT OVERWRITE TABLE events PARTITION (event_dt = '')įrom the Spark driver log you can see the Spark job progress:ġ9/12/26 09:05:16 INFO SparkContext: Running Spark version 2.1.1ġ9/12/26 09:05:35 INFO DirectFileOutputCommitter: Nothing to setup since the outputs areġ9/12/26 09:05:35 INFO FileInputFormat: Total input paths to process : 2126ġ9/12/26 09:05:35 INFO YarnScheduler: Adding task set 0.0 with 2126 tasksġ9/12/26 09:05:40 INFO TaskSetManager: Starting task 0.0 in stage 0.0ġ9/12/26 09:15:54 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (executor 2) (201/2126)ġ9/12/26 09:37:22 INFO TaskSetManager: Finished task 2125.0 in stage 0.0 (executor 339) (1992/2126)ġ9/12/26 09:43:53 INFO TaskSetManager: Finished task 2059.0 in stage 0.0 (executor 322) (2126/2126)ġ9/12/26 09:43:53 INFO DAGScheduler: Job 0 finished: sql at NativeMethodAccessorImpl.java:0, Initially, the PySpark script to load data was as follows: Loading Into Static Partition – Initial Version Let’s see what is the reason of such behavior and how we can improve the performance. gz files (~1.9 TB of data) into Parquet, while the actual Spark job took just 38 minutes to run and the remaining time was spent on loading data into a Hive partition. Initially it took about 4 hours to convert ~2,100 input. This is a typical job in a data lake, it is quite simple but in my case it was very slow. ERROR WorkerSinkTask Task is being killed and will not recover until manually restarted (.runtime.WorkerTask:180) INFO Member connector-consumer-ds2_sink_book_pnlversion_v1-0-d2197886-cf74-4af6-8be9-d7d74f7b3a06 sending LeaveGroup request to coordinator 9092 (id: 2147483647 rack: null) (.:879) INFO Publish thread interrupted for client_id=connector-consumer-ds2_sink_book_pnlversion_v1-0 client_type=CONSUMER session= cluster=NV4qXOVlRtOAedA45AHcXg group=connect-ds2_sink_book_pnlversion_v1 (io.I have a Spark job that transforms incoming data from compressed text files into Parquet format and loads them into a daily partition of a Hive table. Probably caused by an error thrown previously. Error: java.io.IOException: The file being written is in an invalid state. allocated memory: 64 (.InternalParquetRecordWriter:160) ![]() Facing issues when format.class used with parquet data format.
0 Comments
Leave a Reply. |
AuthorWrite something about yourself. No need to be fancy, just an overview. ArchivesCategories |