Skip to content

Commit

Permalink
fix potential resource leak. (#1922)
Browse files Browse the repository at this point in the history
The rows() is a closable resources. We can use Using to protect it.
However, the current logic calls rows() twice and only close it once.
This commit would fix this issue.

Co-authored-by: Yu Zhang <yzhang999272@apple.com>
  • Loading branch information
yu-shipit and Yu Zhang authored Jan 13, 2025
1 parent d7beb06 commit 7c620d6
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 49 deletions.
27 changes: 13 additions & 14 deletions core/src/main/scala/filodb.core/query/RangeVector.scala
Original file line number Diff line number Diff line change
Expand Up @@ -296,11 +296,11 @@ object RepeatValueVector extends StrictLogging {
val startNs = Utils.currentThreadCpuTimeNanos
try {
ChunkMap.validateNoSharedLocks(execPlan)
Using(rv.rows()){
Using.resource(rv.rows()){
rows =>
val nextRow = if (rows.hasNext) Some(rows.next()) else None
new RepeatValueVector(rv.key, startMs, stepMs, endMs, nextRow, schema)
}.get
}
} finally {
ChunkMap.releaseAllSharedLocks()
queryStats.getCpuNanosCounter(Nil).addAndGet(Utils.currentThreadCpuTimeNanos - startNs)
Expand Down Expand Up @@ -573,20 +573,19 @@ object SerializedRangeVector extends StrictLogging {
val startRecordNo = oldContainerOpt.map(_.numRecords).getOrElse(0)
try {
ChunkMap.validateNoSharedLocks(execPlan)
val rows = rv.rows
while (rows.hasNext) {
val nextRow = rows.next()
// Don't encode empty / NaN data over the wire
if (!canRemoveEmptyRows(rv.outputRange, schema) ||
schema.columns(1).colType == DoubleColumn && !java.lang.Double.isNaN(nextRow.getDouble(1)) ||
schema.columns(1).colType == HistogramColumn && !nextRow.getHistogram(1).isEmpty) {
numRows += 1
builder.addFromReader(nextRow, schema, 0)
}
Using.resource(rv.rows()) {
rows => while (rows.hasNext) {
val nextRow = rows.next()
// Don't encode empty / NaN data over the wire
if (!canRemoveEmptyRows(rv.outputRange, schema) ||
schema.columns(1).colType == DoubleColumn && !java.lang.Double.isNaN(nextRow.getDouble(1)) ||
schema.columns(1).colType == HistogramColumn && !nextRow.getHistogram(1).isEmpty) {
numRows += 1
builder.addFromReader(nextRow, schema, 0)
}
}
}
} finally {
rv.rows().close()
// clear exec plan
// When the query is done, clean up lingering shared locks caused by iterator limit.
ChunkMap.releaseAllSharedLocks()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package filodb.query.exec.aggregator

import scala.collection.mutable
import scala.util.Using

import filodb.core.Utils
import filodb.core.binaryrecord2.RecordBuilder
Expand Down Expand Up @@ -94,21 +95,22 @@ class CountValuesRowAggregator(label: String, limit: Int = 1000) extends RowAggr
FiloSchedulers.assertThreadName(QuerySchedName)
// aggRangeVector.rows.take below triggers the ChunkInfoIterator which requires lock/release
ChunkMap.validateNoSharedLocks(s"CountValues-$label")
aggRangeVector.rows.take(limit).foreach { row =>
val rowMap = CountValuesSerDeser.deserialize(row.getBlobBase(1),
row.getBlobNumBytes(1), row.getBlobOffset(1))
rowMap.foreach { (k, v) =>
val rvk = CustomRangeVectorKey(aggRangeVector.key.labelValues +
(label.utf8 -> k.toString.utf8))
val builder = resRvs.getOrElseUpdate(rvk, SerializedRangeVector.newBuilder())
builder.startNewRecord(recSchema)
builder.addLong(row.getLong(0))
builder.addDouble(v)
builder.endRecord()
Using.resource(aggRangeVector.rows()) {
rows => rows.take(limit).foreach { row =>
val rowMap = CountValuesSerDeser.deserialize(row.getBlobBase(1),
row.getBlobNumBytes(1), row.getBlobOffset(1))
rowMap.foreach { (k, v) =>
val rvk = CustomRangeVectorKey(aggRangeVector.key.labelValues +
(label.utf8 -> k.toString.utf8))
val builder = resRvs.getOrElseUpdate(rvk, SerializedRangeVector.newBuilder())
builder.startNewRecord(recSchema)
builder.addLong(row.getLong(0))
builder.addDouble(v)
builder.endRecord()
}
}
}
}
} finally {
aggRangeVector.rows.close()
ChunkMap.releaseAllSharedLocks()
}
resRvs.map { case (key, builder) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import java.util.concurrent.TimeUnit

import scala.collection.{mutable, Iterator}
import scala.collection.mutable.ListBuffer
import scala.util.Using

import com.typesafe.scalalogging.StrictLogging

Expand Down Expand Up @@ -125,33 +126,35 @@ class TopBottomKRowAggregator(k: Int, bottomK: Boolean) extends RowAggregator wi
FiloSchedulers.assertThreadName(QuerySchedName)
ChunkMap.validateNoSharedLocks(s"TopkQuery-$k-$bottomK")
// We limit the results wherever it is materialized first. So it is done here.
val rows = aggRangeVector.rows.take(limit)
val it = Iterator.from(0, rangeParams.stepSecs.toInt)
.takeWhile(_ <= (rangeParams.endSecs - rangeParams.startSecs)).map { t =>
val timestamp = t + rangeParams.startSecs
val rvkSeen = new ListBuffer[RangeVectorKey]
if (rows.hasNext) {
val row = rows.next()
var i = 1
while (row.notNull(i)) {
if (row.filoUTF8String(i) != CustomRangeVectorKey.emptyAsZcUtf8) {
val key = row.filoUTF8String(i)
val rvk = CustomRangeVectorKey.fromZcUtf8(key)
rvkSeen += rvk
val builder = resRvs.getOrElseUpdate(rvk, createBuilder(rangeParams, timestamp))
addRecordToBuilder(builder, TimeUnit.SECONDS.toMillis(timestamp), row.getDouble(i + 1))
Using.resource(aggRangeVector.rows()) {
rs => val rows = rs.take(limit)
val it = Iterator.from(0, rangeParams.stepSecs.toInt)
.takeWhile(_ <= (rangeParams.endSecs - rangeParams.startSecs)).map { t =>
val timestamp = t + rangeParams.startSecs
val rvkSeen = new ListBuffer[RangeVectorKey]
if (rows.hasNext) {
val row = rows.next()
var i = 1
while (row.notNull(i)) {
if (row.filoUTF8String(i) != CustomRangeVectorKey.emptyAsZcUtf8) {
val key = row.filoUTF8String(i)
val rvk = CustomRangeVectorKey.fromZcUtf8(key)
rvkSeen += rvk
val builder = resRvs.getOrElseUpdate(rvk, createBuilder(rangeParams, timestamp))
addRecordToBuilder(builder, TimeUnit.SECONDS.toMillis(timestamp), row.getDouble(i + 1))
}
i += 2
}
resRvs.keySet.foreach { rvs =>
if (!rvkSeen.contains(rvs)) addRecordToBuilder(resRvs(rvs), timestamp * 1000, Double.NaN)
}
i += 2
}
resRvs.keySet.foreach { rvs =>
if (!rvkSeen.contains(rvs)) addRecordToBuilder(resRvs(rvs), timestamp * 1000, Double.NaN)
}
}
// address step == 0 case
if (rangeParams.startSecs == rangeParams.endSecs || rangeParams.stepSecs == 0)
it.take(1).toList else it.toList
}
// address step == 0 case
if (rangeParams.startSecs == rangeParams.endSecs || rangeParams.stepSecs == 0) it.take(1).toList else it.toList
} finally {
aggRangeVector.rows().close()
ChunkMap.releaseAllSharedLocks()
}

Expand Down

0 comments on commit 7c620d6

Please sign in to comment.