funcdecodeJSONBody(w http.ResponseWriter, r *http.Request, dst interface{})error { if r.Header.Get("Content-Type") != "" { value, _ := header.ParseValueAndParams(r.Header, "Content-Type") if value != "application/json" { msg := "Content-Type header is not application/json" return &malformedRequest{status: http.StatusUnsupportedMediaType, msg: msg} } }
r.Body = http.MaxBytesReader(w, r.Body, 1048576)
dec := json.NewDecoder(r.Body) dec.DisallowUnknownFields()
err := dec.Decode(&dst) if err != nil { var syntaxError *json.SyntaxError var unmarshalTypeError *json.UnmarshalTypeError
switch { case errors.As(err, &syntaxError): msg := fmt.Sprintf("Request body contains badly-formed JSON (at position %d)", syntaxError.Offset) return &malformedRequest{status: http.StatusBadRequest, msg: msg} ...... } }
err = dec.Decode(&struct{}{}) if err != io.EOF { msg := "Request body must only contain a single JSON object" return &malformedRequest{status: http.StatusBadRequest, msg: msg} } }
funcNewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions)( fn ObjReaderFn, off, length int64, err error, ) { ...... // Calculate range to read (different for encrypted/compressed objects) switch { case isCompressed: ......
case isEncrypted: ......
// We define a closure that performs decryption given // a reader that returns the desired range of // encrypted bytes. The header parameter is used to // provide encryption parameters. fn = func(inputReader io.Reader, h http.Header, cFns ...func()) (r *GetObjectReader, err error) { copySource := h.Get(xhttp.AmzServerSideEncryptionCopyCustomerAlgorithm) != ""
// Attach decrypter on inputReader var decReader io.Reader decReader, err = DecryptBlocksRequestR(inputReader, h, seqNumber, partStart, oi, copySource) if err != nil { // Call the cleanup funcs for i := len(cFns) - 1; i >= 0; i-- { cFns[i]() } returnnil, err }
oi.ETag = getDecryptedETag(h, oi, false)
// Apply the skipLen and limit on the // decrypted stream decReader = io.LimitReader(ioutil.NewSkipReader(decReader, skipLen), decRangeLength)
// Assemble the GetObjectReader r = &GetObjectReader{ ObjInfo: oi, Reader: decReader, cleanUpFns: cFns, opts: opts, } return r, nil }
if !fi.DataShardFixed() { diskMTime := pickValidDiskTimeWithQuorum(metaArr, fi.Erasure.DataBlocks) if !diskMTime.Equal(timeSentinel) && !diskMTime.IsZero() { for index := range onlineDisks { if onlineDisks[index] == OfflineDisk { continue } if !metaArr[index].IsValid() { continue } if !metaArr[index].AcceptableDelta(diskMTime, shardDiskTimeDelta) { // If disk mTime mismatches it is considered outdated // https://github.com/minio/minio/pull/13803 // // This check only is active if we could find maximally // occurring disk mtimes that are somewhat same across // the quorum. Allowing to skip those shards which we // might think are wrong. onlineDisks[index] = OfflineDisk } } } } ...... fn, off, length, err := NewGetObjectReader(rs, objInfo, opts) if err != nil { returnnil, err } unlockOnDefer = false
// once we have obtained a common FileInfo i.e latest, we should stick // to single dataDir to read the content to avoid reading from some other // dataDir that has stale FileInfo{} to ensure that we fail appropriately // during reads and expect the same dataDir everywhere. dataDir := fi.DataDir for ; partIndex <= lastPartIndex; partIndex++ { if length == totalBytesRead { break }
partNumber := fi.Parts[partIndex].Number
// Save the current part name and size. partSize := fi.Parts[partIndex].Size
partLength := partSize - partOffset // partLength should be adjusted so that we don't write more data than what was requested. if partLength > (length - totalBytesRead) { partLength = length - totalBytesRead }
tillOffset := erasure.ShardFileOffset(partOffset, partLength, partSize) // Get the checksums of the current part. readers := make([]io.ReaderAt, len(onlineDisks)) prefer := make([]bool, len(onlineDisks)) for index, disk := range onlineDisks { if disk == OfflineDisk { continue } if !metaArr[index].IsValid() { continue } checksumInfo := metaArr[index].Erasure.GetChecksumInfo(partNumber) partPath := pathJoin(object, dataDir, fmt.Sprintf("part.%d", partNumber)) readers[index] = newBitrotReader(disk, metaArr[index].Data, bucket, partPath, tillOffset, checksumInfo.Algorithm, checksumInfo.Hash, erasure.ShardSize())
// Prefer local disks prefer[index] = disk.Hostname() == "" }
written, err := erasure.Decode(ctx, writer, readers, partOffset, partLength, partSize, prefer) // Note: we should not be defer'ing the following closeBitrotReaders() call as // we are inside a for loop i.e if we use defer, we would accumulate a lot of open files by the time // we return from this function. closeBitrotReaders(readers) if err != nil { // If we have successfully written all the content that was asked // by the client, but we still see an error - this would mean // that we have some parts or data blocks missing or corrupted // - attempt a heal to successfully heal them for future calls. if written == partLength { var scan madmin.HealScanMode switch { case errors.Is(err, errFileNotFound): scan = madmin.HealNormalScan case errors.Is(err, errFileCorrupt): scan = madmin.HealDeepScan } switch scan { case madmin.HealNormalScan, madmin.HealDeepScan: healOnce.Do(func() { if _, healing := er.getOnlineDisksWithHealing(); !healing { go healObject(bucket, object, fi.VersionID, scan) } }) // Healing is triggered and we have written // successfully the content to client for // the specific part, we should `nil` this error // and proceed forward, instead of throwing errors. err = nil } } if err != nil { return toObjectErr(err, bucket, object) } } for i, r := range readers { if r == nil { onlineDisks[i] = OfflineDisk } } // Track total bytes read from disk and written to the client. totalBytesRead += partLength // partOffset will be valid only for the first part, hence reset it to 0 for // the remaining parts. partOffset = 0 } // End of read all parts loop. // Return success. returnnil }