abstract class TwoPhaseCommitBatchStorage[-B <: RecordBatch, S] extends RecordBatchStorage[B] with Logging with Metrics
An abstract batch storage that stores batches and commits offsets to Kafka in a two phase transaction. Committed stream positions are looked up in Kafka. The batch commit algorithm proceeds in phases as follows:
- stage the batch to storage (e.g. upload file to a temporary path),
- stage offsets to Kafka by performing an offset commit without modifying the actual offset, instead saving the new offset and the staged batch information (e.g. file path of the temporary uploaded file) serialized as compressed and base64 encoded JSON to the offset commit metadata field,
- store the staged batch (e.g. move the temporary file to the final destination)
- commit new offsets to Kafka and clear the staging information from the offset metadata.
If committing fails in the first two stages the recovery will revert it, if it fails afterwards, recovery will complete the transaction.
Implementers need to define the batch staging and storing.
- B
Type of record batches.
- S
Type of the batch staging information, must be JSON serializable.
- Alphabetic
- By Inheritance
- TwoPhaseCommitBatchStorage
- Metrics
- Logging
- RecordBatchStorage
- AnyRef
- Any
- Hide All
- Show All
- Public
- Protected
Instance Constructors
- new TwoPhaseCommitBatchStorage()(implicit arg0: JsonSerializer[S])
Type Members
- class AssignableGauge[T <: AnyRef] extends AnyRef
- Definition Classes
- Metrics
Abstract Value Members
- abstract def isBatchStored(staging: S): Boolean
Checks whether a staged batch is actually stored, used during recovery.
Checks whether a staged batch is actually stored, used during recovery.
- staging
Batch staging information.
- returns
Whether the batch is fully stored.
- Attributes
- protected
- abstract def stageBatch(batch: B): S
Stages a record batch to storage.
Stages a record batch to storage.
- batch
Record batch to store.
- returns
Information about the staging.
- Attributes
- protected
- abstract def storeBatch(staging: S): Unit
Finalizes storage of a staged record batch.
Finalizes storage of a staged record batch.
- staging
Batch staging information.
- Attributes
- protected
Concrete Value Members
- final def !=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def ##: Int
- Definition Classes
- AnyRef → Any
- final def ==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def asInstanceOf[T0]: T0
- Definition Classes
- Any
- def clone(): AnyRef
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.CloneNotSupportedException]) @native()
- final def commitBatch(batch: B): Unit
Commits a given batch to storage.
Commits a given batch to storage.
- Definition Classes
- TwoPhaseCommitBatchStorage → RecordBatchStorage
- final def committedPositions(topicPartitions: Set[TopicPartition]): Map[TopicPartition, Option[StreamPosition]]
Gets the latest committed stream positions for the given partitions.
Gets the latest committed stream positions for the given partitions.
- Definition Classes
- TwoPhaseCommitBatchStorage → RecordBatchStorage
- def createAssignableGauge[T <: AnyRef](name: String, tdf: ToDoubleFunction[T], tags: Seq[MetricTag] = Seq()): AssignableGauge[T]
- Attributes
- protected
- Definition Classes
- Metrics
- def createCounter(name: String, tags: Seq[MetricTag] = Seq()): Counter
- Attributes
- protected
- Definition Classes
- Metrics
- def createDistribution(name: String, tags: Seq[MetricTag] = Seq()): DistributionSummary
- Attributes
- protected
- Definition Classes
- Metrics
- def createGauge[T <: AnyRef](name: String, metric: T, tdf: ToDoubleFunction[T], tags: Seq[MetricTag] = Seq()): Gauge
- Attributes
- protected
- Definition Classes
- Metrics
- def createTimer(name: String, tags: Seq[MetricTag] = Seq(), maxDuration: Duration = null): Timer
- Attributes
- protected
- Definition Classes
- Metrics
- final def eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- def equals(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef → Any
- final def getClass(): Class[_ <: AnyRef]
- Definition Classes
- AnyRef → Any
- Annotations
- @native()
- def hashCode(): Int
- Definition Classes
- AnyRef → Any
- Annotations
- @native()
- def initialize(context: KafkaContext): Unit
Initializes the storage with a Kafka context, which can be used to lookup/commit offsets, if needed.
Initializes the storage with a Kafka context, which can be used to lookup/commit offsets, if needed.
- Definition Classes
- RecordBatchStorage
- def isBatchCommitted(batch: B): Boolean
Checks whether a given batch was successfully committed to storage by comparing committed positions with the record ranges in the batch.
Checks whether a given batch was successfully committed to storage by comparing committed positions with the record ranges in the batch.
- batch
Batch to check.
- returns
Whether the batch is successfully stored.
- Definition Classes
- RecordBatchStorage
- final def isInstanceOf[T0]: Boolean
- Definition Classes
- Any
- val kafkaContext: KafkaContext
- Attributes
- protected
- Definition Classes
- RecordBatchStorage
- val log: Logger
- Attributes
- protected
- Definition Classes
- Logging
- def metricsRoot: String
A common prefix for all created metrics.
A common prefix for all created metrics.
- Attributes
- protected
- Definition Classes
- TwoPhaseCommitBatchStorage → Metrics
- final def ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- final def notify(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
- final def notifyAll(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
- final def recover(topicPartitions: Set[TopicPartition]): Unit
Performs any needed recovery upon startup, e.g.
Performs any needed recovery upon startup, e.g. rolling back or completing transactions. Can fail, users should handle any possible exceptions.
- Definition Classes
- TwoPhaseCommitBatchStorage → RecordBatchStorage
- def removeMeters(meters: Meter*): Unit
- Attributes
- protected
- Definition Classes
- Metrics
- final def synchronized[T0](arg0: => T0): T0
- Definition Classes
- AnyRef
- def toString(): String
- Definition Classes
- AnyRef → Any
- final def wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException]) @native()