Upgrading from Flink 1.3.2 to 1.4.0 hadoop FileSystem and Path issues

apache-flink avro parquet flink-streaming

191 观看

2回复

1151 作者的声誉

I recently tried upgrading from Flink 1.3.2 to 1.4.0 and I'm having some issues with not being able to import org.apache.hadoop.fs.{FileSystem, Path} anymore. The issue is occurring in two places:

ParquetWriter:

import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.flink.streaming.connectors.fs.Writer
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName

class AvroWriter[T <: GenericRecord]() extends Writer[T] {

  @transient private var writer: ParquetWriter[T] = _
  @transient private var schema: Schema = _

  override def write(element: T): Unit = {
    schema = element.getSchema
    writer.write(element)
  }

  override def duplicate(): AvroWriter[T] = new AvroWriter[T]()

  override def close(): Unit = writer.close()

  override def getPos: Long = writer.getDataSize

  override def flush(): Long = writer.getDataSize

  override def open(fs: FileSystem, path: Path): Unit = {
    writer = AvroParquetWriter.builder[T](path)
      .withSchema(schema)
      .withCompressionCodec(CompressionCodecName.SNAPPY)
      .build()
  }

}

CustomBucketer:

import org.apache.flink.streaming.connectors.fs.bucketing.Bucketer
import org.apache.flink.streaming.connectors.fs.Clock
import org.apache.hadoop.fs.{FileSystem, Path}
import java.io.ObjectInputStream
import java.text.SimpleDateFormat
import java.util.Date

import org.apache.avro.generic.GenericRecord

import scala.reflect.ClassTag

class RecordFieldBucketer[T <: GenericRecord: ClassTag](dateField: String = null, dateFieldFormat: String = null, bucketOrder: Seq[String]) extends Bucketer[T] {

  @transient var dateFormatter: SimpleDateFormat = _

  private def readObject(in: ObjectInputStream): Unit = {
    in.defaultReadObject()
    if (dateField != null && dateFieldFormat != null) {
      dateFormatter = new SimpleDateFormat(dateFieldFormat)
    }
  }

  override def getBucketPath(clock: Clock, basePath: Path, element: T): Path = {
    val partitions = bucketOrder.map(field => {
      if (field == dateField) {
        field + "=" + dateFormatter.format(new Date(element.get(field).asInstanceOf[Long]))
      } else {
        field + "=" + element.get(field)
      }
    }).mkString("/")
    new Path(basePath + "/" + partitions)
  }

}

I noticed that Flink now has:

import org.apache.flink.core.fs.{FileSystem, Path}

But new Path doesn't appear to work with the AvroParquetWriter or the getBucketPath method. I know there have been some changes with Flink's FileSystem and Hadoop dependencies and I'm just not sure what I need to import to makes my code work again.

Do I even need to to use the Hadoop dependencies or are there now different ways of writing and bucketing Parquet files to s3?

build.sbt:

val flinkVersion = "1.4.0"

libraryDependencies ++= Seq(
  "org.apache.flink" %% "flink-scala" % flinkVersion % Provided,
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided,
  "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
  "org.apache.flink" %% "flink-connector-filesystem" % flinkVersion,
  "org.apache.flink" % "flink-metrics-core" % flinkVersion,
  "org.apache.flink" % "flink-metrics-graphite" % flinkVersion,
  "org.apache.kafka" %% "kafka" % "0.10.0.1",
  "org.apache.avro" % "avro" % "1.7.7",
  "org.apache.parquet" % "parquet-hadoop" % "1.8.1",
  "org.apache.parquet" % "parquet-avro" % "1.8.1",
  "io.confluent" % "kafka-avro-serializer" % "3.2.2",
  "com.fasterxml.jackson.core" % "jackson-core" % "2.9.2"
)
作者: moku 的来源 发布者: 2017 年 12 月 27 日

回应 2


1

1954 作者的声誉

Building a "Hadoop-Free-Flink" was one major feature of the 1.4 release. All you have to do is to include the hadoop dependencies to your classpath or quoting the changelogs:

... This also means that in cases where you used connectors to HDFS, such as the BucketingSink or RollingSink, you now have to ensure that you either use a Flink distribution with bundled Hadoop dependencies or make sure to include Hadoop dependencies when building a jar file for your application.

作者: TobiSH 发布者: 2017 年 12 月 27 日

0

1151 作者的声誉

决定

The necessary org.apache.hadoop.fs.{FileSystem, Path} classes are found in the hadoop-commons project.

作者: moku 发布者: 2017 年 12 月 28 日
32x32