|  | 
|  | 1 | +/* | 
|  | 2 | + * Copyright 2011-2013 10gen Inc. | 
|  | 3 | + * | 
|  | 4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | 
|  | 5 | + * you may not use this file except in compliance with the License. | 
|  | 6 | + * You may obtain a copy of the License at | 
|  | 7 | + * | 
|  | 8 | + * http://www.apache.org/licenses/LICENSE-2.0 | 
|  | 9 | + * | 
|  | 10 | + * Unless required by applicable law or agreed to in writing, software | 
|  | 11 | + * distributed under the License is distributed on an "AS IS" BASIS, | 
|  | 12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | 13 | + * See the License for the specific language governing permissions and | 
|  | 14 | + * limitations under the License. | 
|  | 15 | + */ | 
|  | 16 | + | 
|  | 17 | +package com.mongodb.hadoop.output; | 
|  | 18 | + | 
|  | 19 | +import com.mongodb.BasicDBObject; | 
|  | 20 | +import com.mongodb.BulkUpdateRequestBuilder; | 
|  | 21 | +import com.mongodb.BulkWriteOperation; | 
|  | 22 | +import com.mongodb.BulkWriteRequestBuilder; | 
|  | 23 | +import com.mongodb.DBCollection; | 
|  | 24 | +import com.mongodb.DBObject; | 
|  | 25 | +import com.mongodb.MongoException; | 
|  | 26 | +import com.mongodb.hadoop.io.BSONWritable; | 
|  | 27 | +import com.mongodb.hadoop.io.MongoUpdateWritable; | 
|  | 28 | +import com.mongodb.hadoop.io.MongoWritableTypes; | 
|  | 29 | +import com.mongodb.hadoop.util.MongoConfigUtil; | 
|  | 30 | +import org.apache.commons.logging.Log; | 
|  | 31 | +import org.apache.commons.logging.LogFactory; | 
|  | 32 | +import org.apache.hadoop.conf.Configuration; | 
|  | 33 | +import org.apache.hadoop.fs.FSDataInputStream; | 
|  | 34 | +import org.apache.hadoop.fs.FileSystem; | 
|  | 35 | +import org.apache.hadoop.fs.Path; | 
|  | 36 | +import org.apache.hadoop.mapreduce.JobContext; | 
|  | 37 | +import org.apache.hadoop.mapreduce.OutputCommitter; | 
|  | 38 | +import org.apache.hadoop.mapreduce.TaskAttemptContext; | 
|  | 39 | + | 
|  | 40 | +import java.io.IOException; | 
|  | 41 | + | 
|  | 42 | +/** | 
|  | 43 | + * MongoOutputCommitter to committ output | 
|  | 44 | +*/ | 
|  | 45 | +public class MongoOutputCommitter extends OutputCommitter { | 
|  | 46 | + | 
|  | 47 | +    public static final String TEMP_DIR_NAME = "_MONGO_OUT_TEMP"; | 
|  | 48 | +    private static final Log LOG = LogFactory.getLog(MongoOutputCommitter.class); | 
|  | 49 | +    private DBCollection collection; | 
|  | 50 | + | 
|  | 51 | +    public MongoOutputCommitter() {} | 
|  | 52 | + | 
|  | 53 | +    @Override | 
|  | 54 | +    public void setupJob(final JobContext jobContext) { | 
|  | 55 | +        LOG.info("Setting up job."); | 
|  | 56 | +    } | 
|  | 57 | + | 
|  | 58 | +    @Override | 
|  | 59 | +    public void setupTask(final TaskAttemptContext taskContext) { | 
|  | 60 | +        LOG.info("Setting up task."); | 
|  | 61 | +    } | 
|  | 62 | + | 
|  | 63 | +    @Override | 
|  | 64 | +    public boolean needsTaskCommit( | 
|  | 65 | +      final TaskAttemptContext taskContext) throws IOException { | 
|  | 66 | +        try { | 
|  | 67 | +            FileSystem fs = FileSystem.get(taskContext.getConfiguration()); | 
|  | 68 | +            // Commit is only necessary if there was any output. | 
|  | 69 | +            return fs.exists(getTaskAttemptPath(taskContext)); | 
|  | 70 | +        } catch (IOException e) { | 
|  | 71 | +            LOG.error("Could not open filesystem", e); | 
|  | 72 | +            throw e; | 
|  | 73 | +        } | 
|  | 74 | +    } | 
|  | 75 | + | 
|  | 76 | +    @Override | 
|  | 77 | +    public void commitTask( | 
|  | 78 | +      final TaskAttemptContext taskContext) throws IOException { | 
|  | 79 | +        LOG.info("Committing task."); | 
|  | 80 | + | 
|  | 81 | +        collection = | 
|  | 82 | +          MongoConfigUtil.getOutputCollection(taskContext.getConfiguration()); | 
|  | 83 | + | 
|  | 84 | +        // Get temporary file. | 
|  | 85 | +        Path tempFilePath = getTaskAttemptPath(taskContext); | 
|  | 86 | +        LOG.info("Committing from temporary file: " + tempFilePath.toString()); | 
|  | 87 | +        long filePos = 0, fileLen; | 
|  | 88 | +        FSDataInputStream inputStream = null; | 
|  | 89 | +        try { | 
|  | 90 | +            FileSystem fs = FileSystem.get(taskContext.getConfiguration()); | 
|  | 91 | +            inputStream = fs.open(tempFilePath); | 
|  | 92 | +            fileLen = fs.getFileStatus(tempFilePath).getLen(); | 
|  | 93 | +        } catch (IOException e) { | 
|  | 94 | +            LOG.error("Could not open temporary file for committing", e); | 
|  | 95 | +            cleanupAfterCommit(inputStream, taskContext); | 
|  | 96 | +            throw e; | 
|  | 97 | +        } | 
|  | 98 | + | 
|  | 99 | +        int maxDocs = MongoConfigUtil.getBatchSize( | 
|  | 100 | +          taskContext.getConfiguration()); | 
|  | 101 | +        int curBatchSize = 0; | 
|  | 102 | + | 
|  | 103 | +        BulkWriteOperation bulkOp; | 
|  | 104 | +        if (MongoConfigUtil.isBulkOrdered(taskContext.getConfiguration())) { | 
|  | 105 | +            bulkOp = collection.initializeOrderedBulkOperation(); | 
|  | 106 | +        } else { | 
|  | 107 | +            bulkOp = collection.initializeUnorderedBulkOperation(); | 
|  | 108 | +        } | 
|  | 109 | + | 
|  | 110 | +        // Read Writables out of the temporary file. | 
|  | 111 | +        BSONWritable bw = new BSONWritable(); | 
|  | 112 | +        MongoUpdateWritable muw = new MongoUpdateWritable(); | 
|  | 113 | +        while (filePos < fileLen) { | 
|  | 114 | +            try { | 
|  | 115 | +                // Determine writable type, and perform corresponding operation | 
|  | 116 | +                // on MongoDB. | 
|  | 117 | +                int mwType = inputStream.readInt(); | 
|  | 118 | +                if (MongoWritableTypes.BSON_WRITABLE == mwType) { | 
|  | 119 | +                    bw.readFields(inputStream); | 
|  | 120 | +                    bulkOp.insert(new BasicDBObject(bw.getDoc().toMap())); | 
|  | 121 | +                } else if (MongoWritableTypes.MONGO_UPDATE_WRITABLE == mwType) { | 
|  | 122 | +                    muw.readFields(inputStream); | 
|  | 123 | +                    DBObject query = new BasicDBObject(muw.getQuery().toMap()); | 
|  | 124 | +                    DBObject modifiers = | 
|  | 125 | +                        new BasicDBObject(muw.getModifiers().toMap()); | 
|  | 126 | +                    BulkWriteRequestBuilder writeBuilder = bulkOp.find(query); | 
|  | 127 | +                    if (muw.isReplace()) { | 
|  | 128 | +                        writeBuilder.replaceOne(modifiers); | 
|  | 129 | +                    } else if (muw.isUpsert()) { | 
|  | 130 | +                        BulkUpdateRequestBuilder updateBuilder = | 
|  | 131 | +                          writeBuilder.upsert(); | 
|  | 132 | +                        if (muw.isMultiUpdate()) { | 
|  | 133 | +                            updateBuilder.update(modifiers); | 
|  | 134 | +                        } else { | 
|  | 135 | +                            updateBuilder.updateOne(modifiers); | 
|  | 136 | +                        } | 
|  | 137 | +                    } else { | 
|  | 138 | +                        // No-upsert update. | 
|  | 139 | +                        if (muw.isMultiUpdate()) { | 
|  | 140 | +                            writeBuilder.update(modifiers); | 
|  | 141 | +                        } else { | 
|  | 142 | +                            writeBuilder.updateOne(modifiers); | 
|  | 143 | +                        } | 
|  | 144 | +                    } | 
|  | 145 | +                } else { | 
|  | 146 | +                    throw new IOException("Unrecognized type: " + mwType); | 
|  | 147 | +                } | 
|  | 148 | +                filePos = inputStream.getPos(); | 
|  | 149 | +                // Write to MongoDB if the batch is full, or if this is the last | 
|  | 150 | +                // operation to be performed for the Task. | 
|  | 151 | +                if (++curBatchSize >= maxDocs || filePos >= fileLen) { | 
|  | 152 | +                    try { | 
|  | 153 | +                        bulkOp.execute(); | 
|  | 154 | +                    } catch (MongoException e) { | 
|  | 155 | +                        LOG.error("Could not write to MongoDB", e); | 
|  | 156 | +                        throw e; | 
|  | 157 | +                    } | 
|  | 158 | +                    bulkOp = collection.initializeOrderedBulkOperation(); | 
|  | 159 | +                    curBatchSize = 0; | 
|  | 160 | + | 
|  | 161 | +                    // Signal progress back to Hadoop framework so that we | 
|  | 162 | +                    // don't time out. | 
|  | 163 | +                    taskContext.progress(); | 
|  | 164 | +                } | 
|  | 165 | +            } catch (IOException e) { | 
|  | 166 | +                LOG.error("Error reading from temporary file", e); | 
|  | 167 | +                throw e; | 
|  | 168 | +            } | 
|  | 169 | +        } | 
|  | 170 | + | 
|  | 171 | +        cleanupAfterCommit(inputStream, taskContext); | 
|  | 172 | +    } | 
|  | 173 | + | 
|  | 174 | +    @Override | 
|  | 175 | +    public void abortTask(final TaskAttemptContext taskContext) | 
|  | 176 | +      throws IOException { | 
|  | 177 | +        LOG.info("Aborting task."); | 
|  | 178 | +        cleanupResources(taskContext); | 
|  | 179 | +    } | 
|  | 180 | + | 
|  | 181 | +    /** | 
|  | 182 | +     * Helper method to close MongoClients and FSDataInputStream and clean up | 
|  | 183 | +     * any files still left around from map/reduce tasks. | 
|  | 184 | +     * | 
|  | 185 | +     * @param inputStream the FSDataInputStream to close. | 
|  | 186 | +     */ | 
|  | 187 | +    private void cleanupAfterCommit( | 
|  | 188 | +        final FSDataInputStream inputStream, | 
|  | 189 | +        final TaskAttemptContext context) | 
|  | 190 | +        throws IOException { | 
|  | 191 | +        if (inputStream != null) { | 
|  | 192 | +            try { | 
|  | 193 | +                inputStream.close(); | 
|  | 194 | +            } catch (IOException e) { | 
|  | 195 | +                LOG.error("Could not close input stream", e); | 
|  | 196 | +                throw e; | 
|  | 197 | +            } | 
|  | 198 | +        } | 
|  | 199 | +        cleanupResources(context); | 
|  | 200 | +    } | 
|  | 201 | + | 
|  | 202 | +    private void cleanupResources( | 
|  | 203 | +      final TaskAttemptContext taskContext) | 
|  | 204 | +        throws IOException { | 
|  | 205 | +        Path currentPath = getTaskAttemptPath(taskContext); | 
|  | 206 | +        Path tempDirectory = getTempDirectory(taskContext.getConfiguration()); | 
|  | 207 | +        FileSystem fs = FileSystem.get(taskContext.getConfiguration()); | 
|  | 208 | +        while (!currentPath.equals(tempDirectory)) { | 
|  | 209 | +            try { | 
|  | 210 | +                fs.delete(currentPath, true); | 
|  | 211 | +            } catch (IOException e) { | 
|  | 212 | +                LOG.error("Could not delete temporary file: " + currentPath, e); | 
|  | 213 | +                throw e; | 
|  | 214 | +            } | 
|  | 215 | +            currentPath = currentPath.getParent(); | 
|  | 216 | +        } | 
|  | 217 | + | 
|  | 218 | +        if (collection != null) { | 
|  | 219 | +            MongoConfigUtil.close(collection.getDB().getMongo()); | 
|  | 220 | +        } | 
|  | 221 | +    } | 
|  | 222 | + | 
|  | 223 | +    private static Path getTempDirectory(final Configuration config) { | 
|  | 224 | +        String basePath = config.get( | 
|  | 225 | +          "mapreduce.task.tmp.dir", | 
|  | 226 | +          config.get( | 
|  | 227 | +            "mapred.child.tmp", | 
|  | 228 | +            config.get("hadoop.tmp.dir", "/tmp"))); | 
|  | 229 | +        return new Path(basePath); | 
|  | 230 | +    } | 
|  | 231 | + | 
|  | 232 | +    /** | 
|  | 233 | +     * Get the Path to where temporary files should be stored for a | 
|  | 234 | +     * TaskAttempt, whose TaskAttemptContext is provided. | 
|  | 235 | +     * | 
|  | 236 | +     * @param context the TaskAttemptContext. | 
|  | 237 | +     * @return the Path to the temporary file for the TaskAttempt. | 
|  | 238 | +     */ | 
|  | 239 | +    public static Path getTaskAttemptPath( | 
|  | 240 | +      final TaskAttemptContext context) { | 
|  | 241 | +        Configuration config = context.getConfiguration(); | 
|  | 242 | +        // Try to use the following base temporary directories, in this order: | 
|  | 243 | +        // 1. New-style option for task tmp dir | 
|  | 244 | +        // 2. Old-style option for task tmp dir | 
|  | 245 | +        // 3. Hadoop system-wide tmp dir | 
|  | 246 | +        // 4. /tmp | 
|  | 247 | +        // Hadoop Paths always use "/" as a directory separator. | 
|  | 248 | +        return new Path( | 
|  | 249 | +          String.format("%s/%s/%s/_out", | 
|  | 250 | +            getTempDirectory(config), | 
|  | 251 | +            context.getTaskAttemptID().toString(), TEMP_DIR_NAME)); | 
|  | 252 | +    } | 
|  | 253 | + | 
|  | 254 | +} | 
0 commit comments