diff --git a/build.sbt b/build.sbt index 0df6ebf69..743535db0 100644 --- a/build.sbt +++ b/build.sbt @@ -26,6 +26,12 @@ lazy val commonDependencies = Seq( Dependencies.Libraries.config, Dependencies.Libraries.prometheus, Dependencies.Libraries.prometheusCommon, + Dependencies.Libraries.opentracingApi, + Dependencies.Libraries.opentracingNoop, + Dependencies.Libraries.jaeger, + Dependencies.Libraries.jaegerZipkin, + Dependencies.Libraries.zipkin, + Dependencies.Libraries.zipkinSender, // Scala Dependencies.Libraries.scopt, Dependencies.Libraries.scalaz7, diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Collector.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Collector.scala index a7e9d1608..e43907b3e 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Collector.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Collector.scala @@ -38,7 +38,8 @@ trait Collector { lazy val log = LoggerFactory.getLogger(getClass()) implicit def hint[T] = ProductHint[T](ConfigFieldMapping(CamelCase, CamelCase)) - implicit val _ = new FieldCoproductHint[SinkConfig]("enabled") + implicit val sinkHint = new FieldCoproductHint[SinkConfig]("enabled") + implicit val tracerHint = new FieldCoproductHint[TracerConfig]("enabled") def parseConfig(args: Array[String]): (CollectorConfig, Config) = { case class FileConfig(config: File = new File(".")) @@ -73,8 +74,11 @@ trait Collector { implicit val materializer = ActorMaterializer() implicit val executionContext = system.dispatcher + val sharedTracer = Tracing.tracer(collectorConf.tracer) + val collectorRoute = new CollectorRoute { - override def collectorService = new CollectorService(collectorConf, sinks) + override def collectorService = new CollectorService(collectorConf, sinks, sharedTracer) + override def tracer = sharedTracer } val prometheusMetricsService = new PrometheusMetricsService(collectorConf.prometheusMetrics) diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoute.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoute.scala index 944c33cd9..e7f4b5872 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoute.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoute.scala @@ -16,14 +16,19 @@ package com.snowplowanalytics.snowplow.collectors.scalastream import akka.http.scaladsl.model._ import akka.http.scaladsl.model.headers.HttpCookiePair -import akka.http.scaladsl.server.{Directive1, Route} +import akka.http.scaladsl.server.{Directive1, Route, RouteResult} import akka.http.scaladsl.server.Directives._ +import io.opentracing.{Span, Tracer} + import model.DntCookieMatcher import monitoring.BeanRegistry +import scala.collection.JavaConverters._ +import scala.util.{Success, Failure} trait CollectorRoute { def collectorService: Service + def tracer: Tracer private val headers = optionalHeaderValueByName("User-Agent") & optionalHeaderValueByName("Referer") & @@ -42,22 +47,78 @@ trait CollectorRoute { complete(StatusCodes.NotFound -> "redirects disabled") } + def traceRoute(inner: Span => Route): Route = + requestContext => { + val span = tracer.buildSpan("handle-request").start + span.setTag("http.method", requestContext.request.method.name) + span.setTag("http.url", requestContext.request.uri.toString) + + val fut = inner(span)(requestContext) + fut.onComplete { result => + result match { + case Success(RouteResult.Complete(response)) => + span.setTag("http.status", response.status.intValue) + case Success(RouteResult.Rejected(rejections)) => + span.setTag("error", result.isFailure) + span.log(Map("event" -> "error", "error.object" -> rejections).asJava) + case Failure(e) => + span.setTag("error", result.isFailure) + span.log(Map("event" -> "error", "error.object" -> e).asJava) + } + span.finish + }(requestContext.executionContext) + fut + } + + // Activates the span only for the local thread + def withActiveSpan[T](span: Span)(f: => T): T = { + val scope = tracer.activateSpan(span) + try { + f + } finally { + scope.close + } + } + def routes: Route = doNotTrack(collectorService.doNotTrackCookie) { dnt => - cookieIfWanted(collectorService.cookieName) { reqCookie => - val cookie = reqCookie.map(_.toCookie) - headers { (userAgent, refererURI, rawRequestURI) => - val qs = queryString(rawRequestURI) - extractors { (host, ip, request) => - // get the adapter vendor and version from the path - path(Segment / Segment) { (vendor, version) => - val path = collectorService.determinePath(vendor, version) - post { - extractContentType { ct => - entity(as[String]) { body => + traceRoute { span => + cookieIfWanted(collectorService.cookieName) { reqCookie => + val cookie = reqCookie.map(_.toCookie) + headers { (userAgent, refererURI, rawRequestURI) => + val qs = queryString(rawRequestURI) + extractors { (host, ip, request) => + // get the adapter vendor and version from the path + path(Segment / Segment) { (vendor, version) => + val path = collectorService.determinePath(vendor, version) + post { + extractContentType { ct => + entity(as[String]) { body => + withActiveSpan(span) { + val (r, _) = collectorService.cookie( + qs, + Some(body), + path, + cookie, + userAgent, + refererURI, + host, + ip, + request, + pixelExpected = false, + doNotTrack = dnt, + Some(ct)) + incrementRequests(r.status) + complete(r) + } + } + } + } ~ + (get | head) { + withActiveSpan(span) { val (r, _) = collectorService.cookie( qs, - Some(body), + None, path, cookie, userAgent, @@ -65,47 +126,32 @@ trait CollectorRoute { host, ip, request, - pixelExpected = false, - doNotTrack = dnt, - Some(ct)) + pixelExpected = true, + doNotTrack = dnt) incrementRequests(r.status) complete(r) } } } ~ - (get | head) { - val (r, _) = collectorService.cookie( - qs, - None, - path, - cookie, - userAgent, - refererURI, - host, - ip, - request, - pixelExpected = true, - doNotTrack = dnt) - incrementRequests(r.status) - complete(r) - } - } ~ - path("""ice\.png""".r | "i".r) { path => - (get | head) { - val (r, _) = collectorService.cookie( - qs, - None, - "/" + path, - cookie, - userAgent, - refererURI, - host, - ip, - request, - pixelExpected = true, - doNotTrack = dnt) - incrementRequests(r.status) - complete(r) + path("""ice\.png""".r | "i".r) { path => + (get | head) { + withActiveSpan(span) { + val (r, _) = collectorService.cookie( + qs, + None, + "/" + path, + cookie, + userAgent, + refererURI, + host, + ip, + request, + pixelExpected = true, + doNotTrack = dnt) + incrementRequests(r.status) + complete(r) + } + } } } } diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala index 643169e94..61255939f 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala @@ -23,6 +23,8 @@ import akka.http.scaladsl.model.headers._ import akka.http.scaladsl.model.headers.CacheDirectives._ import com.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPayload +import io.opentracing.Tracer +import io.opentracing.propagation.{Format, TextMapAdapter} import org.apache.commons.codec.binary.Base64 import org.slf4j.LoggerFactory @@ -66,7 +68,8 @@ object CollectorService { class CollectorService( config: CollectorConfig, - sinks: CollectorSinks + sinks: CollectorSinks, + tracer: Tracer ) extends Service { private val logger = LoggerFactory.getLogger(getClass) @@ -101,6 +104,7 @@ class CollectorService( doNotTrack: Boolean, contentType: Option[ContentType] = None ): (HttpResponse, List[Array[Byte]]) = { + Option(tracer.activeSpan).map(_.log(Map("message" -> "cookie handler").asJava)) val queryParams = Uri.Query(queryString).toMap val (ipAddress, partitionKey) = ipAndPartitionKey(ip, config.streams.useIpAddressAsPartitionKey) @@ -199,6 +203,7 @@ class CollectorService( networkUserId: String, contentType: Option[String] ): CollectorPayload = { + val e = new CollectorPayload( "iglu:com.snowplowanalytics.snowplow/CollectorPayload/thrift/1-0-0", ipAddress, @@ -213,7 +218,7 @@ class CollectorService( refererUri.foreach(e.refererUri = _) e.hostname = hostname e.networkUserId = networkUserId - e.headers = (headers(request) ++ contentType).asJava + e.headers = (headers(request) ++ contentType ++ tracerHeaders).asJava contentType.foreach(e.contentType = _) e } @@ -225,11 +230,17 @@ class CollectorService( ): List[Array[Byte]] = { // Split events into Good and Bad val eventSplit = SplitBatch.splitAndSerializePayload(event, sinks.good.MaxBytes) - // Send events to respective sinks - val sinkResponseGood = sinks.good.storeRawEvents(eventSplit.good, partitionKey) - val sinkResponseBad = sinks.bad.storeRawEvents(eventSplit.bad, partitionKey) - // Sink Responses for Test Sink - sinkResponseGood ++ sinkResponseBad + val span = tracer.buildSpan("sink-raw-events").start() + span.setTag("component", sinks.good.getClass.getSimpleName) + try { + // Send events to respective sinks + val sinkResponseGood = sinks.good.storeRawEvents(eventSplit.good, partitionKey) + val sinkResponseBad = sinks.bad.storeRawEvents(eventSplit.bad, partitionKey) + // Sink Responses for Test Sink + sinkResponseGood ++ sinkResponseBad + } finally { + span.finish + } } /** Builds the final http response from */ @@ -345,6 +356,17 @@ class CollectorService( case other => Some(other.toString) } + def tracerHeaders: Iterable[String] = + Option(tracer.activeSpan) match { + case Some(span) => + val m = scala.collection.mutable.Map.empty[String, String] + val adapter = new TextMapAdapter(m.asJava) + tracer.inject(span.context, Format.Builtin.HTTP_HEADERS, adapter) + m.map { case (k, v) => s"$k: $v" } + case None => + Iterable() + } + /** If the pixel is requested, this attaches cache control headers to the response to prevent any caching. */ def cacheControl(pixelExpected: Boolean): List[`Cache-Control`] = if (pixelExpected) List(`Cache-Control`(`no-cache`, `no-store`, `must-revalidate`)) diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Tracing.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Tracing.scala new file mode 100644 index 000000000..6af79f307 --- /dev/null +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Tracing.scala @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2013-2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, and + * you may not use this file except in compliance with the Apache License + * Version 2.0. You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the Apache License Version 2.0 is distributed on an "AS + * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the Apache License Version 2.0 for the specific language + * governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.collectors.scalastream + +import io.jaegertracing.{Configuration => JaegerConfiguration} +import io.jaegertracing.internal.propagation.B3TextMapCodec +import io.jaegertracing.zipkin.ZipkinV2Reporter +import io.opentracing.Tracer +import io.opentracing.noop.NoopTracerFactory +import io.opentracing.propagation.Format +import org.slf4j.LoggerFactory +import zipkin2.reporter.okhttp3.OkHttpSender +import zipkin2.reporter.AsyncReporter + +import scala.collection.JavaConverters._ + +import com.snowplowanalytics.snowplow.collectors.scalastream.model.TracerConfig + + +object Tracing { + + lazy val log = LoggerFactory.getLogger(getClass()) + + def tracer(config: TracerConfig): Tracer = + config match { + case TracerConfig.Noop => + log.debug("Using noop tracer") + NoopTracerFactory.create + case j: TracerConfig.Jaeger => + log.debug("Using jaeger tracer") + new JaegerConfiguration(j.serviceName) + .withReporter { + val rc = new JaegerConfiguration.ReporterConfiguration + rc.withSender { + val sender = new JaegerConfiguration.SenderConfiguration + j.agentHost.foreach(sender.withAgentHost(_)) + j.agentPort.foreach(sender.withAgentPort(_)) + sender + } + rc + } + .withSampler { + val sampler = new JaegerConfiguration.SamplerConfiguration + j.samplerType.foreach(sampler.withType(_)) + j.samplerParam.foreach(sampler.withParam(_)) + j.managerHostPort.foreach(sampler.withManagerHostPort(_)) + sampler + } + .withTracerTags(j.tracerTags.asJava) + .getTracer + case z: TracerConfig.Zipkin => + log.debug("Using zipkin tracer") + val b3Codec = new B3TextMapCodec.Builder().build; + + new JaegerConfiguration(z.serviceName) + .withSampler { + (new JaegerConfiguration.SamplerConfiguration) + .withType(z.samplerType) + .withParam(z.samplerParam) + } + .withTracerTags(z.tracerTags.asJava) + .getTracerBuilder + .withReporter { + new ZipkinV2Reporter(AsyncReporter.create(OkHttpSender.create(z.endpoint))) + } + .registerInjector(Format.Builtin.HTTP_HEADERS, b3Codec) + .registerExtractor(Format.Builtin.HTTP_HEADERS, b3Codec) + .build + } +} diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/model.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/model.scala index a95332ac1..c2147926f 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/model.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/model.scala @@ -142,6 +142,28 @@ package model { redirect: Boolean = false, port: Int = 443 ) + + sealed trait TracerConfig + object TracerConfig { + case object Noop extends TracerConfig + case class Jaeger( + serviceName: String, + agentHost: Option[String], + agentPort: Option[Int], + samplerType: Option[String], + samplerParam: Option[Float], + managerHostPort: Option[String], + tracerTags: Map[String, String] = Map() + ) extends TracerConfig + case class Zipkin( + serviceName: String, + endpoint: String, + samplerType: String = "const", + samplerParam: Float = 1.0f, + tracerTags: Map[String, String] = Map() + ) extends TracerConfig + } + final case class CollectorConfig( interface: String, port: Int, @@ -157,7 +179,8 @@ package model { streams: StreamsConfig, prometheusMetrics: PrometheusMetricsConfig, enableDefaultRedirect: Boolean = false, - ssl: SSLConfig = SSLConfig() + ssl: SSLConfig = SSLConfig(), + tracer: TracerConfig = TracerConfig.Noop ) { val cookieConfig = if (cookie.enabled) Some(cookie) else None val doNotTrackHttpCookie = diff --git a/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRouteSpec.scala b/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRouteSpec.scala index 871d8b14d..f4ca8bf66 100644 --- a/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRouteSpec.scala +++ b/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRouteSpec.scala @@ -19,10 +19,12 @@ import akka.http.scaladsl.model.headers._ import akka.http.scaladsl.testkit.Specs2RouteTest import akka.http.scaladsl.server.Directives._ import com.snowplowanalytics.snowplow.collectors.scalastream.model.DntCookieMatcher +import io.opentracing.noop.NoopTracerFactory import org.specs2.mutable.Specification class CollectorRouteSpec extends Specification with Specs2RouteTest { val mkRoute = (withRedirects: Boolean) => new CollectorRoute { + override val tracer = NoopTracerFactory.create override val collectorService = new Service { def preflightResponse(req: HttpRequest): HttpResponse = HttpResponse(200, entity = "preflight response") diff --git a/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorServiceSpec.scala b/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorServiceSpec.scala index ea3e7be02..2781b722c 100644 --- a/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorServiceSpec.scala +++ b/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorServiceSpec.scala @@ -24,6 +24,7 @@ import akka.http.scaladsl.model._ import akka.http.scaladsl.model.headers._ import akka.http.scaladsl.model.headers.CacheDirectives._ +import io.opentracing.noop.NoopTracerFactory import org.apache.thrift.{TDeserializer, TSerializer} import com.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPayload @@ -35,11 +36,13 @@ import model._ class CollectorServiceSpec extends Specification { val service = new CollectorService( TestUtils.testConf, - CollectorSinks(new TestSink, new TestSink) + CollectorSinks(new TestSink, new TestSink), + NoopTracerFactory.create ) val bouncingService = new CollectorService( TestUtils.testConf.copy(cookieBounce = TestUtils.testConf.cookieBounce.copy(enabled = true)), - CollectorSinks(new TestSink, new TestSink) + CollectorSinks(new TestSink, new TestSink), + NoopTracerFactory.create ) val uuidRegex = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}".r val event = new CollectorPayload( @@ -503,7 +506,8 @@ class CollectorServiceSpec extends Specification { "should pass on the original path if no mapping for it can be found" in { val service = new CollectorService( TestUtils.testConf.copy(paths = Map.empty[String, String]), - CollectorSinks(new TestSink, new TestSink) + CollectorSinks(new TestSink, new TestSink), + NoopTracerFactory.create ) val expected1 = "/com.acme/track" val expected2 = "/com.acme/redirect" diff --git a/examples/config.hocon.sample b/examples/config.hocon.sample index 1bcef1e3f..73a0d020b 100644 --- a/examples/config.hocon.sample +++ b/examples/config.hocon.sample @@ -318,6 +318,45 @@ collector { } } + # Enable Jaeger distributed tracing + #tracer { + # enabled = jaeger + # serviceName = collector + # + # # The following are all optional, and use the Jaeger client defaults if not set. + # agentHost = localhost + # agentPort = 6831 + # managerHostPort = localhost:5778 + # # For sampler types see https://www.jaegertracing.io/docs/1.17/sampling/#client-sampling-configuration + # samplerType = remote + # samplerParam = # Not needed for remote sampler type + # + # # Optional tags to append to the trace + # tracerTags = { + # tag1 = value1 + # tag2 = value2 + # } + #} + + # Or enable Zipkin distributed tracing + tracer { + enabled = zipkin + serviceName = collector + endpoint = "http://localhost:9411/api/v2/spans" + + # samplerType must be one of constant, probabilistic, or ratelimiting. Default is constant. + # See https://www.jaegertracing.io/docs/1.17/sampling/#client-sampling-configuration + # Note we use jaeger client sampling, even when we send traces to zipkin backend. + samplerType = ratelimiting + samplerParam = 2.0 + + # Optional tags to append to the trace + tracerTags = { + tag1 = value1 + tag2 = value2 + } + } + } # Akka has a variety of possible configuration options defined at diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 17b653546..e17927121 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -40,6 +40,9 @@ object Dependencies { val thrift = "0.13.0" // force this version of lib from dependencies to mitigate secutiry vulnerabilities, TODO: update underlying libraries val commonsCodec = "1.13" // force this version of lib from dependencies to mitigate secutiry vulnerabilities, TODO: update underlying libraries val grpcCore = "1.31.0" // force this version of lib from dependencies to mitigate secutiry vulnerabilities, TODO: update underlying libraries + val opentracing = "0.33.0" + val jaeger = "1.4.0" + val zipkin = "2.16.0" // Scala val collectorPayload = "0.0.0" val scalaz7 = "7.0.9" @@ -71,6 +74,12 @@ object Dependencies { val prometheus = "io.prometheus" % "simpleclient" % V.prometheus val prometheusCommon = "io.prometheus" % "simpleclient_common" % V.prometheus val cbor = "com.fasterxml.jackson.dataformat" % "jackson-dataformat-cbor" % V.cbor + val opentracingApi = "io.opentracing" % "opentracing-api" % V.opentracing + val opentracingNoop = "io.opentracing" % "opentracing-noop" % V.opentracing + val jaeger = "io.jaegertracing" % "jaeger-client" % V.jaeger + val jaegerZipkin = "io.jaegertracing" % "jaeger-zipkin" % V.jaeger + val zipkin = "io.zipkin.reporter2" % "zipkin-reporter" % V.zipkin + val zipkinSender = "io.zipkin.reporter2" % "zipkin-sender-okhttp3" % V.zipkin val retry = "com.softwaremill.retry" %% "retry" % V.retry // Scala