Skip to content

Commit

Permalink
Merge integration to master for 0.9.5
Browse files Browse the repository at this point in the history
  • Loading branch information
sherali42 authored Mar 31, 2020
2 parents 256c74b + 95c7cbf commit e148ba2
Show file tree
Hide file tree
Showing 116 changed files with 2,522 additions and 1,543 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -353,25 +353,24 @@ extends ColumnStore with CassandraChunkSource with StrictLogging {
}
}

private def hour(millis: Long = System.currentTimeMillis()) = millis / 1000 / 60 / 60

def writePartKeys(ref: DatasetRef,
shard: Int,
partKeys: Observable[PartKeyRecord],
diskTTLSeconds: Int,
diskTTLSeconds: Int, updateHour: Long,
writeToPkUTTable: Boolean = true): Future[Response] = {
val pkTable = getOrCreatePartitionKeysTable(ref, shard)
val pkByUTTable = getOrCreatePartitionKeysByUpdateTimeTable(ref)
val updateHour = hour()
val span = Kamon.spanBuilder("write-part-keys").asChildOf(Kamon.currentSpan()).start()
val ret = partKeys.mapAsync(writeParallelism) { pk =>
val ttl = if (pk.endTime == Long.MaxValue) -1 else diskTTLSeconds
val split = pk.hash.get % pkByUTNumSplits // caller needs to supply hash for partKey - cannot be None
// caller needs to supply hash for partKey - cannot be None
// Logical & MaxValue needed to make split positive by zeroing sign bit
val split = (pk.hash.get & Int.MaxValue) % pkByUTNumSplits
val writePkFut = pkTable.writePartKey(pk, ttl).flatMap {
case resp if resp == Success
&& writeToPkUTTable => pkByUTTable.writePartKey(shard, updateHour, split, pk, pkByUTTtlSeconds)

case resp => Future.successful(resp)
case resp if resp == Success && writeToPkUTTable =>
pkByUTTable.writePartKey(shard, updateHour, split, pk, pkByUTTtlSeconds)
case resp =>
Future.successful(resp)
}
Task.fromFuture(writePkFut).map{ resp =>
sinkStats.partKeysWrite(1)
Expand All @@ -386,11 +385,16 @@ extends ColumnStore with CassandraChunkSource with StrictLogging {
shard: Int,
updateHour: Long): Observable[PartKeyRecord] = {
val pkByUTTable = getOrCreatePartitionKeysByUpdateTimeTable(ref)
Observable.fromIterable(0 to pkByUTNumSplits)
Observable.fromIterable(0 until pkByUTNumSplits)
.flatMap { split => pkByUTTable.scanPartKeys(shard, updateHour, split) }
}
}

/**
* FIXME this works only for Murmur3Partitioner because it generates
* Long based tokens. If other partitioners are used, this can potentially break.
* Correct way is to pass Token objects so CQL stmts can bind tokens with stmt.bind().setPartitionKeyToken(token)
*/
case class CassandraTokenRangeSplit(tokens: Seq[(String, String)],
replicas: Set[InetSocketAddress]) extends ScanSplit {
// NOTE: You need both the host string and the IP address for Spark's locality to work
Expand Down Expand Up @@ -477,14 +481,14 @@ trait CassandraChunkSource extends RawChunkSource with StrictLogging {

def getOrCreateIngestionTimeIndexTable(dataset: DatasetRef): IngestionTimeIndexTable = {
indexTableCache.getOrElseUpdate(dataset,
{ dataset: DatasetRef =>
new IngestionTimeIndexTable(dataset, clusterConnector)(readEc) })
}
{ dataset: DatasetRef =>
new IngestionTimeIndexTable(dataset, clusterConnector, ingestionConsistencyLevel)(readEc) })
}

def getOrCreatePartitionKeysByUpdateTimeTable(dataset: DatasetRef): PartitionKeysByUpdateTimeTable = {
partKeysByUTTableCache.getOrElseUpdate(dataset,
{ dataset: DatasetRef =>
new PartitionKeysByUpdateTimeTable(dataset, clusterConnector)(readEc) })
new PartitionKeysByUpdateTimeTable(dataset, clusterConnector, ingestionConsistencyLevel)(readEc) })
}

def getOrCreatePartitionKeysTable(dataset: DatasetRef, shard: Int): PartitionKeysTable = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import filodb.core.store.ChunkSinkStats
* using TimeSeriesChunksTable. This is because the chunks are likely to be fetched from
* Cassandra due to locality when using TimeSeriesChunksTable, and the chunks table is smaller.
*/
sealed class IngestionTimeIndexTable(val dataset: DatasetRef, val connector: FiloCassandraConnector)
sealed class IngestionTimeIndexTable(val dataset: DatasetRef,
val connector: FiloCassandraConnector,
writeConsistencyLevel: ConsistencyLevel)
(implicit ec: ExecutionContext) extends BaseDatasetTable {
import scala.collection.JavaConverters._

Expand All @@ -36,9 +38,26 @@ sealed class IngestionTimeIndexTable(val dataset: DatasetRef, val connector: Fil
|) WITH compression = {
'sstable_compression': '$sstableCompression'}""".stripMargin

lazy val allCql = session.prepare(
s"SELECT ingestion_time, start_time, info FROM $tableString " +
s" WHERE partition = ?")
private lazy val writeIndexCql = session.prepare(
s"INSERT INTO $tableString (partition, ingestion_time, start_time, info) " +
s"VALUES (?, ?, ?, ?) USING TTL ?")
.setConsistencyLevel(writeConsistencyLevel)

private lazy val allCql = session.prepare(
s"SELECT ingestion_time, start_time, info FROM $tableString " +
s"WHERE partition = ?")
.setConsistencyLevel(ConsistencyLevel.ONE)

private lazy val scanCql1 = session.prepare(
s"SELECT partition, ingestion_time, start_time, info FROM $tableString " +
s"WHERE TOKEN(partition) >= ? AND TOKEN(partition) < ? AND ingestion_time >= ? AND ingestion_time <= ? " +
s"ALLOW FILTERING")
.setConsistencyLevel(ConsistencyLevel.ONE)

private lazy val scanCql2 = session.prepare(
s"SELECT partition FROM $tableString " +
s"WHERE TOKEN(partition) >= ? AND TOKEN(partition) < ? AND ingestion_time >= ? AND ingestion_time <= ? " +
s"ALLOW FILTERING")
.setConsistencyLevel(ConsistencyLevel.ONE)

/**
Expand All @@ -63,36 +82,40 @@ sealed class IngestionTimeIndexTable(val dataset: DatasetRef, val connector: Fil
def scanRowsByIngestionTimeNoAsync(tokens: Seq[(String, String)],
ingestionTimeStart: Long,
ingestionTimeEnd: Long): Iterator[Row] = {
def cql(start: String, end: String): String =
s"SELECT partition, ingestion_time, start_time, info FROM $tableString " +
s"WHERE TOKEN(partition) >= $start AND TOKEN(partition) < $end " +
s"AND ingestion_time >= $ingestionTimeStart AND ingestion_time <= $ingestionTimeEnd " +
s"ALLOW FILTERING"

tokens.iterator.flatMap { case (start, end) =>
session.execute(cql(start, end)).iterator.asScala
/*
* FIXME conversion of tokens to Long works only for Murmur3Partitioner because it generates
* Long based tokens. If other partitioners are used, this can potentially break.
* Correct way is to pass Token objects around and bind tokens with stmt.bind().setPartitionKeyToken(token)
*/
val stmt = scanCql1.bind(start.toLong: java.lang.Long,
end.toLong: java.lang.Long,
ingestionTimeStart: java.lang.Long,
ingestionTimeEnd: java.lang.Long)
session.execute(stmt).iterator.asScala
}
}

def scanPartKeysByIngestionTime(tokens: Seq[(String, String)],
ingestionTimeStart: Long,
ingestionTimeEnd: Long): Observable[ByteBuffer] = {
def cql(start: String, end: String): String =
s"SELECT partition FROM $tableString WHERE TOKEN(partition) >= $start AND TOKEN(partition) < $end " +
s"AND ingestion_time >= $ingestionTimeStart AND ingestion_time <= $ingestionTimeEnd " +
s"ALLOW FILTERING"
val it = tokens.iterator.flatMap { case (start, end) =>
session.execute(cql(start, end)).iterator.asScala
/*
* FIXME conversion of tokens to Long works only for Murmur3Partitioner because it generates
* Long based tokens. If other partitioners are used, this can potentially break.
* Correct way is to pass Token objects around and bind tokens with stmt.bind().setPartitionKeyToken(token)
*/
val stmt = scanCql2.bind(start.toLong: java.lang.Long,
end.toLong: java.lang.Long,
ingestionTimeStart: java.lang.Long,
ingestionTimeEnd: java.lang.Long)
session.execute(stmt).iterator.asScala
.map { row => row.getBytes("partition") }
}
Observable.fromIterator(it).handleObservableErrors
}

lazy val writeIndexCql = session.prepare(
s"INSERT INTO $tableString (partition, ingestion_time, start_time, info) " +
"VALUES (?, ?, ?, ?) USING TTL ?")
.setConsistencyLevel(ConsistencyLevel.ONE)

/**
* Writes new records to the ingestion table.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import filodb.core.{DatasetRef, Response}
import filodb.core.store.PartKeyRecord

sealed class PartitionKeysByUpdateTimeTable(val dataset: DatasetRef,
val connector: FiloCassandraConnector)
val connector: FiloCassandraConnector,
writeConsistencyLevel: ConsistencyLevel)
(implicit ec: ExecutionContext) extends BaseDatasetTable {

import filodb.cassandra.Util._
Expand All @@ -32,11 +33,16 @@ sealed class PartitionKeysByUpdateTimeTable(val dataset: DatasetRef,
| WITH compression = {'chunk_length_in_kb': '16', 'sstable_compression': '$sstableCompression'}""".stripMargin
// TODO time window compaction since we have time component in the primary key

lazy val writePartitionKeyCql =
session.prepare(
s"INSERT INTO ${tableString} (shard, epochHour, split, partKey, startTime, endTime) " +
s"VALUES (?, ?, ?, ?, ?, ?) USING TTL ?")
.setConsistencyLevel(ConsistencyLevel.ONE)
private lazy val writePartitionKeyCql = session.prepare(
s"INSERT INTO $tableString (shard, epochHour, split, partKey, startTime, endTime) " +
s"VALUES (?, ?, ?, ?, ?, ?) USING TTL ?")
.setConsistencyLevel(writeConsistencyLevel)

private lazy val readCql = session.prepare(
s"SELECT * FROM $tableString " +
s"WHERE shard = ? AND epochHour = ? AND split = ? ")
.setConsistencyLevel(ConsistencyLevel.ONE)


def writePartKey(shard: Int, updateHour: Long, split: Int,
pk: PartKeyRecord, ttlSeconds: Int): Future[Response] = {
Expand All @@ -45,8 +51,6 @@ sealed class PartitionKeysByUpdateTimeTable(val dataset: DatasetRef,
toBuffer(pk.partKey), pk.startTime: JLong, pk.endTime: JLong, ttlSeconds: JInt))
}

lazy val readCql = session.prepare(s"SELECT * FROM $tableString " +
s"WHERE shard = ? AND epochHour = ? AND split = ? ")
def scanPartKeys(shard: Int, updateHour: Long, split: Int): Observable[PartKeyRecord] = {
session.executeAsync(readCql.bind(shard: JInt, updateHour: JLong, split: JInt))
.toObservable.handleObservableErrors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,21 @@ sealed class PartitionKeysTable(val dataset: DatasetRef,
| PRIMARY KEY (partKey)
|) WITH compression = {'chunk_length_in_kb': '16', 'sstable_compression': '$sstableCompression'}""".stripMargin

lazy val writePartitionCql =
session.prepare(
s"INSERT INTO ${tableString} (partKey, startTime, endTime) VALUES (?, ?, ?) USING TTL ?")
private lazy val writePartitionCql = session.prepare(
s"INSERT INTO ${tableString} (partKey, startTime, endTime) " +
s"VALUES (?, ?, ?) USING TTL ?")
.setConsistencyLevel(writeConsistencyLevel)

lazy val writePartitionCqlNoTtl =
session.prepare(
s"INSERT INTO ${tableString} (partKey, startTime, endTime) VALUES (?, ?, ?)")
private lazy val writePartitionCqlNoTtl = session.prepare(
s"INSERT INTO ${tableString} (partKey, startTime, endTime) " +
s"VALUES (?, ?, ?)")
.setConsistencyLevel(writeConsistencyLevel)

private lazy val scanCql = session.prepare(
s"SELECT * FROM $tableString " +
s"WHERE TOKEN(partKey) >= ? AND TOKEN(partKey) < ?")
.setConsistencyLevel(ConsistencyLevel.ONE)

def writePartKey(pk: PartKeyRecord, diskTimeToLiveSeconds: Int): Future[Response] = {
if (diskTimeToLiveSeconds <= 0) {
connector.execStmtWithRetries(writePartitionCqlNoTtl.bind(
Expand All @@ -50,8 +55,6 @@ sealed class PartitionKeysTable(val dataset: DatasetRef,
}
}

lazy val scanCql = session.prepare(s"SELECT * FROM $tableString " +
s"WHERE TOKEN(partKey) >= ? AND TOKEN(partKey) < ?")
def scanPartKeys(tokens: Seq[(String, String)], shard: Int): Observable[PartKeyRecord] = {
val res: Observable[Iterator[PartKeyRecord]] = Observable.fromIterable(tokens)
.mapAsync { range =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,36 @@ sealed class TimeSeriesChunksTable(val dataset: DatasetRef,
|) WITH compression = {
'sstable_compression': '$sstableCompression'}""".stripMargin

lazy val writeChunksCql = session.prepare(
s"""INSERT INTO $tableString (partition, chunkid, info, chunks
|) VALUES (?, ?, ?, ?) USING TTL ?""".stripMargin
).setConsistencyLevel(writeConsistencyLevel)
private lazy val writeChunksCql = session.prepare(
s"INSERT INTO $tableString (partition, chunkid, info, chunks) " +
s"VALUES (?, ?, ?, ?) USING TTL ?")
.setConsistencyLevel(writeConsistencyLevel)

private lazy val readChunkInCql = session.prepare(
s"SELECT info, chunks FROM $tableString " +
s"WHERE partition = ? AND chunkid IN ?")
.setConsistencyLevel(ConsistencyLevel.ONE)

private lazy val readChunksCql = session.prepare(
s"SELECT chunkid, info, chunks FROM $tableString " +
s"WHERE partition = ? AND chunkid IN ?")
.setConsistencyLevel(ConsistencyLevel.ONE)

private lazy val readAllChunksCql = session.prepare(
s"SELECT chunkid, info, chunks FROM $tableString " +
s"WHERE partition = ?")
.setConsistencyLevel(ConsistencyLevel.ONE)

private lazy val scanBySplit = session.prepare(
s"SELECT partition, info, chunks FROM $tableString " +
s"WHERE TOKEN(partition) >= ? AND TOKEN(partition) < ?")
.setConsistencyLevel(ConsistencyLevel.ONE)

private lazy val readChunkRangeCql = session.prepare(
s"SELECT partition, info, chunks FROM $tableString " +
s"WHERE partition IN ? AND chunkid >= ? AND chunkid < ?")
.setConsistencyLevel(ConsistencyLevel.ONE)


def writeChunks(partition: Array[Byte],
chunkInfo: ChunkSetInfo,
Expand Down Expand Up @@ -81,12 +107,6 @@ sealed class TimeSeriesChunksTable(val dataset: DatasetRef,
)
}

lazy val readChunkInCql = session.prepare(
s"""SELECT info, chunks FROM $tableString WHERE
| partition = ?
| AND chunkid IN ?""".stripMargin)
.setConsistencyLevel(ConsistencyLevel.ONE)

/**
* Reads and returns a single RawPartData, raw data for a single partition/time series
*/
Expand All @@ -109,10 +129,6 @@ sealed class TimeSeriesChunksTable(val dataset: DatasetRef,
RawChunkSet(row.getBytes(infoIndex).array, chunks)
}

lazy val readChunksCql = session.prepare(
s"SELECT chunkid, info, chunks FROM $tableString WHERE partition = ? AND chunkid IN ?")
.setConsistencyLevel(ConsistencyLevel.ONE)

/**
* Reads raw chunk set Rows consisting of:
*
Expand All @@ -129,10 +145,6 @@ sealed class TimeSeriesChunksTable(val dataset: DatasetRef,
session.execute(query)
}

lazy val readAllChunksCql = session.prepare(
s"SELECT chunkid, info, chunks FROM $tableString WHERE partition = ?")
.setConsistencyLevel(ConsistencyLevel.ONE)

/**
* Test method which returns the same results as the readChunks method. Not async-friendly.
*/
Expand All @@ -143,12 +155,6 @@ sealed class TimeSeriesChunksTable(val dataset: DatasetRef,
/**
* Reads and returns a stream of RawPartDatas given a range of chunkIDs from multiple partitions
*/
lazy val readChunkRangeCql = session.prepare(
s"""SELECT partition, info, chunks FROM $tableString WHERE
| partition IN ?
| AND chunkid >= ? AND chunkid < ?""".stripMargin)
.setConsistencyLevel(ConsistencyLevel.ONE)

def readRawPartitionRange(partitions: Seq[Array[Byte]],
startTime: Long,
endTimeExclusive: Long): Observable[RawPartData] = {
Expand Down Expand Up @@ -177,10 +183,15 @@ sealed class TimeSeriesChunksTable(val dataset: DatasetRef,
}

def scanPartitionsBySplit(tokens: Seq[(String, String)]): Observable[RawPartData] = {
def cql(start: String, end: String): String =
s"SELECT partition, info, chunks FROM $tableString WHERE TOKEN(partition) >= $start AND TOKEN(partition) < $end "

val res: Observable[Future[Iterator[RawPartData]]] = Observable.fromIterable(tokens).map { case (start, end) =>
session.executeAsync(cql(start, end)).toIterator.handleErrors
/*
* FIXME conversion of tokens to Long works only for Murmur3Partitioner because it generates
* Long based tokens. If other partitioners are used, this can potentially break.
* Correct way to bind tokens is to do stmt.bind().setPartitionKeyToken(token)
*/
val stmt = scanBySplit.bind(start.toLong: java.lang.Long, end.toLong: java.lang.Long)
session.executeAsync(stmt).toIterator.handleErrors
.map { rowIt =>
rowIt.map { row => (row.getBytes(0), chunkSetFromRow(row, 1)) }
.sortedGroupBy(_._1)
Expand Down
Loading

0 comments on commit e148ba2

Please sign in to comment.