Skip to content

Commit

Permalink
Add scalafmtSbt settings and CI Check
Browse files Browse the repository at this point in the history
Other repos use certain settings for formatting of sbt files, added same here + CI check for formatting, and ran the foramtter.
  • Loading branch information
colmsnowplow committed Jun 13, 2024
1 parent 43e30b7 commit f9c0f4f
Show file tree
Hide file tree
Showing 172 changed files with 1,343 additions and 1,333 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
java-version: 11
- name: Check Scala formatting
if: ${{ always() }}
run: sbt scalafmtCheckAll
run: sbt scalafmtCheckAll scalafmtSbtCheck
- name: Run tests
run: sbt coverage test coverageReport
- name: Aggregate coverage data
Expand Down
24 changes: 24 additions & 0 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,30 @@ runner.dialect = scala212
align.preset = none
align.openParenCallSite = false
align.arrowEnumeratorGenerator = true
align.tokens = [
{
code = "=>"
owners = [{
regex = "Case"
}]
},
{
code = "="
owners = []
},
{
code = "%"
owners = [{
regex = "Term.ApplyInfix"
}]
},
{
code = "%%"
owners = [{
regex = "Term.ApplyInfix"
}]
}
]
maxColumn = 140
docstrings.style = Asterisk
docstrings.wrap = yes
Expand Down
36 changes: 20 additions & 16 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,22 @@
*/

/**
* Inspired by the same issue in the Iglu Server, see commit https://github.com/snowplow/iglu-server/pull/128/commits/d4a0940c12181dd40bc5cdc2249f2c84c5296353
*
* Currently we have some libs that depend on circe 0.14.x and some that depend on 0.13.x.
* These reported binary incompatibilities can only be removed once we have bumped cats-effect to version 3.
* For now, we ignore the reported binary incompatibilities because testing shows it is safe.
* Inspired by the same issue in the Iglu Server, see commit
* https://github.com/snowplow/iglu-server/pull/128/commits/d4a0940c12181dd40bc5cdc2249f2c84c5296353
*
* Currently we have some libs that depend on circe 0.14.x and some that depend on 0.13.x. These
* reported binary incompatibilities can only be removed once we have bumped cats-effect to version
* 3. For now, we ignore the reported binary incompatibilities because testing shows it is safe.
*/
ThisBuild / libraryDependencySchemes ++= Seq(
"io.circe" %% "circe-core" % "always",
"io.circe" %% "circe-core" % "always",
"io.circe" %% "circe-generic" % "always",
"io.circe" %% "circe-parser" % "always",
"io.circe" %% "circe-jawn" % "always",
"io.circe" %% "circe-parser" % "always",
"io.circe" %% "circe-jawn" % "always"
)

lazy val root = project.in(file("."))
lazy val root = project
.in(file("."))
.aggregate(
aws,
gcp,
Expand All @@ -37,7 +39,7 @@ lazy val root = project.in(file("."))
transformerBatch,
transformerKinesis,
transformerPubsub,
transformerKafka
transformerKafka
)

lazy val common: Project = project
Expand Down Expand Up @@ -83,8 +85,8 @@ lazy val loader = project
.settings(addCompilerPlugin(Dependencies.betterMonadicFor))
.settings(libraryDependencies ++= Dependencies.loaderDependencies)
.dependsOn(
aws % "compile->compile;test->test;runtime->runtime",
gcp % "compile->compile;test->test",
aws % "compile->compile;test->test;runtime->runtime",
gcp % "compile->compile;test->test",
azure % "compile->compile;test->test"
)

Expand Down Expand Up @@ -116,7 +118,7 @@ lazy val snowflakeLoader = project
.settings(excludeDependencies ++= Dependencies.commonExclusions)
.settings(addCompilerPlugin(Dependencies.betterMonadicFor))
.settings(libraryDependencies ++= Dependencies.snowflakeDependencies)
.dependsOn(common % "compile->compile;test->test",loader % "compile->compile;test->test;runtime->runtime")
.dependsOn(common % "compile->compile;test->test", loader % "compile->compile;test->test;runtime->runtime")
.enablePlugins(JavaAppPackaging, SnowplowDockerPlugin, BuildInfoPlugin)

lazy val snowflakeLoaderDistroless = project
Expand Down Expand Up @@ -205,10 +207,12 @@ lazy val transformerKafka = project
.settings(addCompilerPlugin(Dependencies.betterMonadicFor))
.settings(libraryDependencies ++= Dependencies.transformerKafkaDependencies)
.settings(excludeDependencies ++= Dependencies.commonExclusions)
.dependsOn(commonTransformerStream % "compile->compile;test->test;runtime->runtime", azure % "compile->compile;test->test;runtime->runtime")
.dependsOn(
commonTransformerStream % "compile->compile;test->test;runtime->runtime",
azure % "compile->compile;test->test;runtime->runtime"
)
.enablePlugins(JavaAppPackaging, SnowplowDockerPlugin, BuildInfoPlugin)


lazy val transformerKafkaDistroless = project
.in(file("modules/distroless/transformer-kafka"))
.settings(sourceDirectory := (transformerKafka / sourceDirectory).value)
Expand All @@ -218,4 +222,4 @@ lazy val transformerKafkaDistroless = project
.settings(libraryDependencies ++= Dependencies.transformerKafkaDependencies)
.settings(excludeDependencies ++= Dependencies.commonExclusions)
.dependsOn(commonTransformerStream % "compile->compile;test->test;runtime->runtime", azure % "compile->compile;test->test")
.enablePlugins(JavaAppPackaging, SnowplowDistrolessDockerPlugin, BuildInfoPlugin)
.enablePlugins(JavaAppPackaging, SnowplowDistrolessDockerPlugin, BuildInfoPlugin)
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ object Kinesis {

case class Message[F[_]: Sync](record: CommittableRecord) extends Queue.Consumer.Message[F] {
override def content: String = getContent(record)
override def ack: F[Unit] = record.checkpoint
override def ack: F[Unit] = record.checkpoint
}

def consumer[F[_]: Async: Applicative](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ object KinesisProducer {
val withRegion = KinesisClient.builder().region(AWSRegion.of(region.name))
customEndpoint match {
case Some(endpoint) => withRegion.endpointOverride(endpoint).build()
case None => withRegion.build()
case None => withRegion.build()
}
}

Expand All @@ -72,7 +72,7 @@ object KinesisProducer {
streamName: String,
records: List[PutRecordsRequestEntry]
): F[Unit] = {
val policyForErrors = Retries.fullJitter[F](internalErrorsPolicy)
val policyForErrors = Retries.fullJitter[F](internalErrorsPolicy)
val policyForThrottling = Retries.fibonacci[F](throttlingErrorsPolicy)

def runAndCaptureFailures(ref: Ref[F, List[PutRecordsRequestEntry]]): F[List[PutRecordsRequestEntry]] =
Expand All @@ -88,7 +88,7 @@ object KinesisProducer {
ref <- Ref.of[F, List[PutRecordsRequestEntry]](records)
failures <- runAndCaptureFailures(ref)
.retryingOnFailures(
policy = policyForThrottling,
policy = policyForThrottling,
wasSuccessful = entries => Sync[F].pure(entries.isEmpty),
onFailure = { case (result, retryDetails) =>
val msg = failureMessageForThrottling(result, streamName)
Expand Down Expand Up @@ -153,7 +153,7 @@ object KinesisProducer {
.blocking(putRecords(kinesis, streamName, records))
.map(TryBatchResult.build(records, _))
.retryingOnFailuresAndAllErrors(
policy = retryPolicy,
policy = retryPolicy,
wasSuccessful = r => Sync[F].pure(!r.shouldRetrySameBatch),
onFailure = { case (result, retryDetails) =>
val msg = failureMessageForInternalErrors(records, streamName, result)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ object S3 {
*/
def getKey(url: Url[S3Blob]): BlobStorage.BlobObject = {
val bucketName = url.authority.show
val keyPath = url.path.relative.show
val key = BlobStorage.Key.coerce(s"s3://${bucketName}/${keyPath}")
val keyPath = url.path.relative.show
val key = BlobStorage.Key.coerce(s"s3://${bucketName}/${keyPath}")
BlobStorage.BlobObject(key, url.path.representation.size.getOrElse(0L))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ object AzureBlobStorage {

def create[F[_]: Async](path: URI, builder: BlobServiceClientBuilder): Resource[F, AzureBlobStorage[F]] = {
val pathParts = PathParts.parse(path.toString)
val client = builder.endpoint(pathParts.root).buildAsyncClient()
val client = builder.endpoint(pathParts.root).buildAsyncClient()
createStore(client).map(new AzureBlobStorage(_, pathParts))
}

Expand Down Expand Up @@ -122,11 +122,11 @@ object AzureBlobStorage {
def parse(path: String): PathParts = {
val parts = BlobUrlParts.parse(path)
PathParts(
containerName = parts.getBlobContainerName,
containerName = parts.getBlobContainerName,
storageAccountName = parts.getAccountName,
scheme = parts.getScheme,
endpointSuffix = parts.getHost.stripPrefix(s"${parts.getAccountName}.blob."),
relative = Option(parts.getBlobName).getOrElse("")
scheme = parts.getScheme,
endpointSuffix = parts.getHost.stripPrefix(s"${parts.getAccountName}.blob."),
relative = Option(parts.getBlobName).getOrElse("")
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@ import org.apache.hadoop.conf.Configuration
*/
class AzureTokenProvider extends CustomTokenProviderAdaptee {

private var expiryTime: Date = _
private var expiryTime: Date = _
private var accountName: String = _

override def initialize(configuration: Configuration, accountName: String): Unit =
this.accountName = accountName

override def getAccessToken: String = {
val creds = new DefaultAzureCredentialBuilder().build()
val creds = new DefaultAzureCredentialBuilder().build()
val request = new TokenRequestContext().addScopes(s"https://$accountName/.default")
val token = creds.getToken(request).block()
val token = creds.getToken(request).block()
this.expiryTime = new Date(token.getExpiresAt.toInstant.toEpochMilli)
token.getToken
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ object KafkaConsumer {

final case class KafkaMessage[F[_]](record: CommittableConsumerRecord[F, String, Array[Byte]]) extends Queue.Consumer.Message[F] {
override def content: String = new String(record.record.value, StandardCharsets.UTF_8)
override def ack: F[Unit] = record.offset.commit
override def ack: F[Unit] = record.offset.commit
}

def consumer[F[_]: Async](
Expand All @@ -49,7 +49,7 @@ object KafkaConsumer {
override def read: fs2.Stream[F, Consumer.Message[F]] = {
val stream = consumer.records.map(KafkaMessage(_))
postProcess match {
case None => stream
case None => stream
case Some(p) => stream.flatMap(p.process(_))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,23 @@ class AzureBlobStorageSpec extends Specification {
"parse root path" in {
PathParts.parse(testContainerPath) must beEqualTo(
PathParts(
containerName = "test-container",
containerName = "test-container",
storageAccountName = "accountName",
scheme = "https",
endpointSuffix = "core.windows.net",
relative = ""
scheme = "https",
endpointSuffix = "core.windows.net",
relative = ""
)
)
}

"parse non-root path" in {
PathParts.parse(s"$testContainerPath/path1/path2/") must beEqualTo(
PathParts(
containerName = "test-container",
containerName = "test-container",
storageAccountName = "accountName",
scheme = "https",
endpointSuffix = "core.windows.net",
relative = "path1/path2/"
scheme = "https",
endpointSuffix = "core.windows.net",
relative = "path1/path2/"
)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ object Completion {
timestamps <- Sync[F].realTimeInstant.map { now =>
Timestamps(window.toInstant, now, state.minCollector, state.maxCollector)
}
base = BlobStorage.Folder.coerce(root.toString).append(window.getDir)
base = BlobStorage.Folder.coerce(root.toString).append(window.getDir)
shreddingCompletePath = base.withKey(sealFile)
count = LoaderMessage.Count(state.total - state.bad, Some(state.bad))
count = LoaderMessage.Count(state.total - state.bad, Some(state.bad))
message = LoaderMessage.ShreddingComplete(
BlobStorage.Folder.coerce(base),
getTypes(state.types),
Expand All @@ -96,7 +96,7 @@ object Completion {
key: BlobStorage.Key,
content: String
): F[Unit] = {
val pipe = blobStorage.put(key, false)
val pipe = blobStorage.put(key, false)
val bytes = Stream.emits[F, Byte](content.getBytes)
bytes.through(pipe).compile.drain
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ object Processing {

final case class SuccessfulTransformation(original: Event, output: List[Transformed])

type Windowed[A, C] = Record[Window, A, C]
type TransformationResult = Either[BadRow, SuccessfulTransformation]
type Windowed[A, C] = Record[Window, A, C]
type TransformationResult = Either[BadRow, SuccessfulTransformation]
type TransformationResults[C] = (List[TransformationResult], C)
type SerializationResults[C] = (List[(SinkPath, Transformed.Data)], State[C])
type SerializationResults[C] = (List[(SinkPath, Transformed.Data)], State[C])

def run[F[_]: Async: Parallel, C: Checkpointer[F, *]](
resources: Resources[F, C],
Expand Down Expand Up @@ -149,12 +149,12 @@ object Processing {
(k: SinkPath) =>
k.pathType match {
case PathType.Good => parquetSink(w)(s)(k)
case PathType.Bad => nonParquetSink(w)(s)(k)
case PathType.Bad => nonParquetSink(w)(s)(k)
}

val dataSink = formats match {
case Formats.WideRow.PARQUET => parquetCombinedSink
case _ => nonParquetSink
case _ => nonParquetSink
}

Partitioned.write[F, Window, SinkPath, Transformed.Data, State[C]](dataSink, config.bufferSize)
Expand Down Expand Up @@ -266,7 +266,7 @@ object Processing {
val pathType = if (p.isGood) SinkPath.PathType.Good else SinkPath.PathType.Bad
SinkPath(suffix, pathType)
case p: Transformed.WideRow =>
val suffix = None
val suffix = None
val pathType = if (p.good) SinkPath.PathType.Good else SinkPath.PathType.Bad
SinkPath(suffix, pathType)
case _: Transformed.Parquet =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ object Resources {
mkBadQueue: Bad.Queue => Resource[F, Queue.ChunkProducer[F]]
): Resource[F, BadSink[F]] =
config.output.bad match {
case Bad.File => Resource.pure[F, BadSink[F]](BadSink.UseBlobStorage())
case Bad.File => Resource.pure[F, BadSink[F]](BadSink.UseBlobStorage())
case queueConfig: Bad.Queue => mkBadQueue(queueConfig).map(BadSink.UseQueue(_))
}

Expand All @@ -146,7 +146,7 @@ object Resources {
Resource.eval {
mkAtomicLengths(igluResolver, config).flatMap {
case Right(atomicLengths) => Monad[F].pure(Event.parser(atomicLengths))
case Left(error) => MonadThrow[F].raiseError[EventParser](error)
case Left(error) => MonadThrow[F].raiseError[EventParser](error)
}
}
private def mkAtomicLengths[F[_]: Monad: Clock: RegistryLookup](
Expand All @@ -167,13 +167,13 @@ object Resources {
private def getRegionFromConfig(config: Config): Option[String] =
config.input match {
case c: Config.StreamInput.Kinesis => Some(c.region.name)
case _ => None
case _ => None
}

private def getCloudFromConfig(config: Config): Option[Telemetry.Cloud] =
config.input match {
case _: Config.StreamInput.Kinesis => Some(Telemetry.Cloud.Aws)
case _: Config.StreamInput.Pubsub => Some(Telemetry.Cloud.Gcp)
case _ => None
case _: Config.StreamInput.Pubsub => Some(Telemetry.Cloud.Gcp)
case _ => None
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ object Transformer {

def badTransform(badRow: BadRow): Transformed = {
val SchemaKey(vendor, name, _, SchemaVer.Full(model, revision, addition)) = badRow.schemaKey
val data = Transformed.Data.DString(badRow.compact)
val data = Transformed.Data.DString(badRow.compact)
Transformed.Shredded.Json(false, vendor, name, model, revision, addition, data)
}

Expand Down Expand Up @@ -98,7 +98,7 @@ object Transformer {
TypesInfo.WideRow.Type(schemaKey, SnowplowEntity.from(shredProperty))
}
val fileFormat = format match {
case Formats.WideRow.JSON => TypesInfo.WideRow.WideRowFormat.JSON
case Formats.WideRow.JSON => TypesInfo.WideRow.WideRowFormat.JSON
case Formats.WideRow.PARQUET => TypesInfo.WideRow.WideRowFormat.PARQUET
}
TypesInfo.WideRow(fileFormat, wrapped.toList)
Expand Down
Loading

0 comments on commit f9c0f4f

Please sign in to comment.