diff --git a/reader.go b/reader.go index 39bcb49..83410eb 100644 --- a/reader.go +++ b/reader.go @@ -75,7 +75,8 @@ type ReaderConfig struct { } type ConsumeConfig struct { - Limit int64 `json:"limit"` + Limit int64 `json:"limit"` + Name string `json:"name"` } type Duration struct { @@ -323,6 +324,7 @@ func (k *Kafka) consume( } messages := make([]map[string]interface{}, 0) + messagesName := consumeConfig.Name maxWait := reader.Config().MaxWait @@ -331,7 +333,7 @@ func (k *Kafka) consume( msg, err := reader.ReadMessage(ctxWithTimeout) cancel() if errors.Is(err, io.EOF) { - k.reportReaderStats(reader.Stats()) + k.reportReaderStats(reader.Stats(), messagesName) err = NewXk6KafkaError(noMoreMessages, "No more messages.", nil) logger.WithField("error", err).Info(err) @@ -339,7 +341,7 @@ func (k *Kafka) consume( } if err != nil { - k.reportReaderStats(reader.Stats()) + k.reportReaderStats(reader.Stats(), messagesName) err = NewXk6KafkaError(failedReadMessage, "Unable to read messages.", nil) logger.WithField("error", err).Error(err) @@ -376,13 +378,13 @@ func (k *Kafka) consume( messages = append(messages, message) } - k.reportReaderStats(reader.Stats()) + k.reportReaderStats(reader.Stats(), messagesName) return messages } // reportReaderStats reports the reader stats // nolint:funlen -func (k *Kafka) reportReaderStats(currentStats kafkago.ReaderStats) { +func (k *Kafka) reportReaderStats(currentStats kafkago.ReaderStats, messagesName string) { state := k.vu.State() if state == nil { logger.WithField("error", ErrForbiddenInInitContext).Error(ErrForbiddenInInitContext) @@ -400,6 +402,7 @@ func (k *Kafka) reportReaderStats(currentStats kafkago.ReaderStats) { sampleTags := ctm.Tags.With("topic", currentStats.Topic) sampleTags = sampleTags.With("clientid", currentStats.ClientID) sampleTags = sampleTags.With("partition", currentStats.Partition) + sampleTags = sampleTags.With("name", messagesName) now := time.Now() metrics.PushIfNotDone(ctx, state.Samples, metrics.ConnectedSamples{ diff --git a/writer.go b/writer.go index b6cbb15..4845672 100644 --- a/writer.go +++ b/writer.go @@ -62,6 +62,7 @@ type Message struct { Key []byte `json:"key"` Value []byte `json:"value"` Headers map[string]interface{} `json:"headers"` + Name string `json:"name"` // If not set at the creation, Time will be automatically set when // writing the message. @@ -207,6 +208,7 @@ func (k *Kafka) produce(writer *kafkago.Writer, produceConfig *ProduceConfig) { } kafkaMessages := make([]kafkago.Message, len(produceConfig.Messages)) + kafkaMessagesName := produceConfig.Messages[0].Name for index, message := range produceConfig.Messages { kafkaMessages[index] = kafkago.Message{ Offset: message.Offset, @@ -247,7 +249,7 @@ func (k *Kafka) produce(writer *kafkago.Writer, produceConfig *ProduceConfig) { originalErr := writer.WriteMessages(k.vu.Context(), kafkaMessages...) - k.reportWriterStats(writer.Stats()) + k.reportWriterStats(writer.Stats(), kafkaMessagesName) if originalErr != nil { err := NewXk6KafkaError(writerError, "Error writing messages.", originalErr) @@ -258,7 +260,7 @@ func (k *Kafka) produce(writer *kafkago.Writer, produceConfig *ProduceConfig) { // reportWriterStats reports the writer stats to the state. // nolint: funlen -func (k *Kafka) reportWriterStats(currentStats kafkago.WriterStats) { +func (k *Kafka) reportWriterStats(currentStats kafkago.WriterStats, messagesName string) { state := k.vu.State() if state == nil { logger.WithField("error", ErrForbiddenInInitContext).Error(ErrForbiddenInInitContext) @@ -274,6 +276,7 @@ func (k *Kafka) reportWriterStats(currentStats kafkago.WriterStats) { ctm := k.vu.State().Tags.GetCurrentValues() sampleTags := ctm.Tags.With("topic", currentStats.Topic) + sampleTags = sampleTags.With("name", messagesName) now := time.Now()