Skip to content

Commit

Permalink
Adds opentracing to the collector routes
Browse files Browse the repository at this point in the history
  • Loading branch information
Ian Streeter committed Nov 5, 2020
1 parent 5ad57ca commit cc7de51
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 10 deletions.
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ lazy val commonDependencies = Seq(
Dependencies.Libraries.config,
Dependencies.Libraries.prometheus,
Dependencies.Libraries.prometheusCommon,
Dependencies.Libraries.opentracingApi,
Dependencies.Libraries.opentracingNoop,
// Scala
Dependencies.Libraries.scopt,
Dependencies.Libraries.scalaz7,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import com.typesafe.config.{Config, ConfigFactory}
import com.typesafe.sslconfig.akka.AkkaSSLConfig
import io.opentracing.noop.NoopTracerFactory
import org.slf4j.LoggerFactory
import pureconfig._
import pureconfig.generic.{FieldCoproductHint, ProductHint}
import pureconfig.generic.auto._


import metrics._
import model._

Expand Down Expand Up @@ -73,8 +75,11 @@ trait Collector {
implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher

val sharedTracer = NoopTracerFactory.create

val collectorRoute = new CollectorRoute {
override def collectorService = new CollectorService(collectorConf, sinks)
override def collectorService = new CollectorService(collectorConf, sinks, sharedTracer)
override def tracer = sharedTracer
}

val prometheusMetricsService = new PrometheusMetricsService(collectorConf.prometheusMetrics)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,19 @@ package com.snowplowanalytics.snowplow.collectors.scalastream

import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.HttpCookiePair
import akka.http.scaladsl.server.{Directive1, Route}
import akka.http.scaladsl.server.{Directive1, Route, StandardRoute}
import akka.http.scaladsl.server.Directives._

import io.opentracing.{Span, Tracer}

import model.DntCookieMatcher
import monitoring.BeanRegistry

import scala.concurrent.ExecutionContext

trait CollectorRoute {
def collectorService: Service
def tracer: Tracer

private val headers = optionalHeaderValueByName("User-Agent") &
optionalHeaderValueByName("Referer") &
Expand All @@ -42,8 +47,18 @@ trait CollectorRoute {
complete(StatusCodes.NotFound -> "redirects disabled")
}

def completeWithSpan(r: HttpResponse, span: Span): StandardRoute =
requestContext => {
val fut = complete(r)(requestContext)
fut.onComplete { _ =>
span.finish
}(ExecutionContext.global)
fut
}

def routes: Route =
doNotTrack(collectorService.doNotTrackCookie) { dnt =>
val span = tracer.buildSpan("CollectorRequest").start
cookieIfWanted(collectorService.cookieName) { reqCookie =>
val cookie = reqCookie.map(_.toCookie)
headers { (userAgent, refererURI, rawRequestURI) =>
Expand All @@ -69,7 +84,7 @@ trait CollectorRoute {
doNotTrack = dnt,
Some(ct))
incrementRequests(r.status)
complete(r)
completeWithSpan(r, span)
}
}
} ~
Expand All @@ -87,7 +102,7 @@ trait CollectorRoute {
pixelExpected = true,
doNotTrack = dnt)
incrementRequests(r.status)
complete(r)
completeWithSpan(r, span)
}
} ~
path("""ice\.png""".r | "i".r) { path =>
Expand All @@ -105,7 +120,7 @@ trait CollectorRoute {
pixelExpected = true,
doNotTrack = dnt)
incrementRequests(r.status)
complete(r)
completeWithSpan(r, span)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import akka.http.scaladsl.model.headers._
import akka.http.scaladsl.model.headers.CacheDirectives._

import com.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPayload
import io.opentracing.Tracer
import io.opentracing.propagation.{Format, TextMapAdapter}
import org.apache.commons.codec.binary.Base64
import org.slf4j.LoggerFactory

Expand Down Expand Up @@ -66,7 +68,8 @@ object CollectorService {

class CollectorService(
config: CollectorConfig,
sinks: CollectorSinks
sinks: CollectorSinks,
tracer: Tracer
) extends Service {

private val logger = LoggerFactory.getLogger(getClass)
Expand Down Expand Up @@ -101,6 +104,7 @@ class CollectorService(
doNotTrack: Boolean,
contentType: Option[ContentType] = None
): (HttpResponse, List[Array[Byte]]) = {
//val span = GlobalTracer.get.buildSpan("CollectorRequest").start
val queryParams = Uri.Query(queryString).toMap

val (ipAddress, partitionKey) = ipAndPartitionKey(ip, config.streams.useIpAddressAsPartitionKey)
Expand Down Expand Up @@ -199,6 +203,7 @@ class CollectorService(
networkUserId: String,
contentType: Option[String]
): CollectorPayload = {

val e = new CollectorPayload(
"iglu:com.snowplowanalytics.snowplow/CollectorPayload/thrift/1-0-0",
ipAddress,
Expand All @@ -213,7 +218,7 @@ class CollectorService(
refererUri.foreach(e.refererUri = _)
e.hostname = hostname
e.networkUserId = networkUserId
e.headers = (headers(request) ++ contentType).asJava
e.headers = (headers(request) ++ contentType ++ tracerHeaders).asJava
contentType.foreach(e.contentType = _)
e
}
Expand All @@ -226,8 +231,10 @@ class CollectorService(
// Split events into Good and Bad
val eventSplit = SplitBatch.splitAndSerializePayload(event, sinks.good.MaxBytes)
// Send events to respective sinks
val span = tracer.buildSpan("SinkRawEvents").start
val sinkResponseGood = sinks.good.storeRawEvents(eventSplit.good, partitionKey)
val sinkResponseBad = sinks.bad.storeRawEvents(eventSplit.bad, partitionKey)
span.finish
// Sink Responses for Test Sink
sinkResponseGood ++ sinkResponseBad
}
Expand Down Expand Up @@ -345,6 +352,13 @@ class CollectorService(
case other => Some(other.toString)
}

def tracerHeaders: Iterable[String] = {
val m = scala.collection.mutable.Map.empty[String, String]
val adapter = new TextMapAdapter(m.asJava)
tracer.inject(tracer.activeSpan.context, Format.Builtin.HTTP_HEADERS, adapter)
m.map { case (k, v) => s"$k: $v" }
}

/** If the pixel is requested, this attaches cache control headers to the response to prevent any caching. */
def cacheControl(pixelExpected: Boolean): List[`Cache-Control`] =
if (pixelExpected) List(`Cache-Control`(`no-cache`, `no-store`, `must-revalidate`))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ import akka.http.scaladsl.model.headers._
import akka.http.scaladsl.testkit.Specs2RouteTest
import akka.http.scaladsl.server.Directives._
import com.snowplowanalytics.snowplow.collectors.scalastream.model.DntCookieMatcher
import io.opentracing.noop.NoopTracerFactory
import org.specs2.mutable.Specification

class CollectorRouteSpec extends Specification with Specs2RouteTest {
val mkRoute = (withRedirects: Boolean) => new CollectorRoute {
override val tracer = NoopTracerFactory.create
override val collectorService = new Service {
def preflightResponse(req: HttpRequest): HttpResponse =
HttpResponse(200, entity = "preflight response")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers._
import akka.http.scaladsl.model.headers.CacheDirectives._

import io.opentracing.noop.NoopTracerFactory
import org.apache.thrift.{TDeserializer, TSerializer}

import com.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPayload
Expand All @@ -35,11 +36,13 @@ import model._
class CollectorServiceSpec extends Specification {
val service = new CollectorService(
TestUtils.testConf,
CollectorSinks(new TestSink, new TestSink)
CollectorSinks(new TestSink, new TestSink),
NoopTracerFactory.create
)
val bouncingService = new CollectorService(
TestUtils.testConf.copy(cookieBounce = TestUtils.testConf.cookieBounce.copy(enabled = true)),
CollectorSinks(new TestSink, new TestSink)
CollectorSinks(new TestSink, new TestSink),
NoopTracerFactory.create
)
val uuidRegex = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}".r
val event = new CollectorPayload(
Expand Down Expand Up @@ -503,7 +506,8 @@ class CollectorServiceSpec extends Specification {
"should pass on the original path if no mapping for it can be found" in {
val service = new CollectorService(
TestUtils.testConf.copy(paths = Map.empty[String, String]),
CollectorSinks(new TestSink, new TestSink)
CollectorSinks(new TestSink, new TestSink),
NoopTracerFactory.create
)
val expected1 = "/com.acme/track"
val expected2 = "/com.acme/redirect"
Expand Down
3 changes: 3 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ object Dependencies {
val thrift = "0.13.0" // force this version of lib from dependencies to mitigate secutiry vulnerabilities, TODO: update underlying libraries
val commonsCodec = "1.13" // force this version of lib from dependencies to mitigate secutiry vulnerabilities, TODO: update underlying libraries
val grpcCore = "1.31.0" // force this version of lib from dependencies to mitigate secutiry vulnerabilities, TODO: update underlying libraries
val opentracing = "0.33.0"
// Scala
val collectorPayload = "0.0.0"
val scalaz7 = "7.0.9"
Expand Down Expand Up @@ -71,6 +72,8 @@ object Dependencies {
val prometheus = "io.prometheus" % "simpleclient" % V.prometheus
val prometheusCommon = "io.prometheus" % "simpleclient_common" % V.prometheus
val cbor = "com.fasterxml.jackson.dataformat" % "jackson-dataformat-cbor" % V.cbor
val opentracingApi = "io.opentracing" % "opentracing-api" % V.opentracing
val opentracingNoop = "io.opentracing" % "opentracing-noop" % V.opentracing
val retry = "com.softwaremill.retry" %% "retry" % V.retry

// Scala
Expand Down

0 comments on commit cc7de51

Please sign in to comment.