Flume
Introduction to Flume
Apache Flume is a Hadoop ecosystem project originally developed by Cloudera designed to capture, transform, and ingest data into HDFS using one or more agents.
Apache Flume is an ideal fit for streams of data that we would like to aggregate, store, and analyze using Hadoop.
Flume is designed for high-volume ingestion into Hadoop of event-based data.
The initial use case was based upon capturing log files, or web logs, from a source system like a web server, and routing these files and their messages into HDFS as they are generated.
The usual destination (or sink in Flume parlance) is HDFS. However, Flume is flexible enough to write to other systems, like HBase or Solr.
Flume Agents
To use Flume, we need to run a Flume agent, which is a long-lived Java process that runs sources and sinks, connected by channels.
Agents can connect a data source directly to HDFS or to other downstream agents.
Agents can also perform in-flight data operations, including basic transformations, compression, encryption, batching of events and more.
A Flume installation is made up of a collection of connected agents running in a distributed topology.
Agents on the edge of the system (co-located on web server machines, for example) collect data and forward it to agents that are responsible for aggregating and then storing the data in its final destination.
Agents are configured to run a collection of particular sources and sinks, so using Flume is mainly a configuration exercise in wiring the pieces together.
A Flume agent source instructs the agent where the data is to be received from.
A Flume agent sink tells the agent where to send data.
- Often the destination is HDFS, which was the original intention for the project.
- However, the destination could be another agent that will do some further in-flight processing, or another filesystem such as S3.
The Flume agent channel is a queue between the agent’s source and sink.
- Flume implements a transactional architecture for added reliability. This enables rollback and retry operations if required.
A source in Flume produces events and delivers them to the channel, which stores the events until they are forwarded to the sink.
You can think of the source-channel-sink combination as a basic Flume building block.
Flume Agent example
# Flume Components
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
# Source
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -f logfile.log
agent1.sources.source1.channels = channel1
# Sink
agent1.sinks.sink1.type = logger
agent1.sinks.sink1.channel = channel1
# Channel
agent1.channels.channel1.type = memory
HDFS Sink
The point of Flume is to deliver large amounts of data into a Hadoop data store, so let’s look at how to configure a Flume agent to deliver events to an HDFS sink.
The only two settings that are required are the sink’s type (hdfs) and hdfs.path.
# Flume Components
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
# Source
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -f logfile.log
agent1.sources.source1.channels = channel1
# Sink
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path = flume/simple
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.channel = channel1
# Channel
agent1.channels.channel1.type = memory
# Flume Components
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
# Source
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -f logfile.log
agent1.sources.source1.channels = channel1
# Sink
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path = flume/simple
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.filePrefix = events
agent1.sinks.sink1.hdfs.fileSuffix = .log
agent1.sinks.sink1.channel = channel1
# Channel
agent1.channels.channel1.type = memory
Fan Out
Fan out is the term for delivering events from one source to multiple channels, so they reach multiple sinks.
The below configuration delivers events to both an HDFS sink (sink1a via channel1a) and a logger sink (sink1b via channel1b).
# Flume Components
agent1.sources = tail-source
agent1.sinks = local-sink hdfs-sink
agent1.channels = local-memory-channel hdfs-memory-channel
# Channels
agent1.channels.local-memory-channel.type = memory
agent1.channels.hdfs-memory-channel.type = memory
# Source
agent1.sources.tail-source.type = exec
agent1.sources.tail-source.command = tail -F logfile.log
agent1.sources.tail-source.channels = local-memory-channel hdfs-memory-channel
# Define a sink that outputs to local file.
agent1.sinks.local-sink.type = file_roll
agent1.sinks.local-sink.sink.directory = flume/replicate
agent1.sinks.local-sink.sink.rollInterval = 90
agent1.sinks.local-sink.channel = local-memory-channel
# Define a sink that outputs to hdfs.
agent1.sinks.hdfs-sink.type = hdfs
agent1.sinks.hdfs-sink.hdfs.path = flume/replicate
agent1.sinks.hdfs-sink.hdfs.fileType = DataStream
agent1.sinks.hdfs-sink.hdfs.rollCount = 5
agent1.sinks.hdfs-sink.hdfs.inUseSuffix = .tmp
agent1.sinks.hdfs-sink.channel = hdfs-memory-channel
Agent Tiers
If there is one agent running on every node producing raw data, then with the setup described so far, at any particular time each file being written to HDFS will consist entirely of the events from one node.
It would be better if we could aggregate the events from a group of nodes in a single file, since this would result in fewer, larger files.
Aggregating Flume events is achieved by having tiers of Flume agents. The first tier collects events from the original sources (such as web servers) and sends them to a smaller set of agents in the second tier, which aggregate events from the first tier before writing them to HDFS.
Tiers are constructed by using a special sink that sends events over the network, and a corresponding source that receives events.
The Avro sink sends events over Avro RPC (Remote Procedure Call) to an Avro source running in another Flume agent.
## First-tier agent
# Flume Components
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
# Source
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -F logfile.log
agent1.sources.source1.channels = channel1
# Define a sink that outputs to a source.
agent1.sinks.sink1.channel = channel1
agent1.sinks.sink1.type = avro
agent1.sinks.sink1.hostname = 172.31.43.67
agent1.sinks.sink1.port = 14000
# Channels
agent1.channels.channel1.type = memory
## Second-tier agent
# Flume Components
agent2.sources = source2
agent2.sinks = sink2
agent2.channels = channel2
# Source as a sink
agent2.sources.source2.channels = channel2
agent2.sources.source2.type = avro
agent2.sources.source2.bind = 172.31.43.67
agent2.sources.source2.port = 14000
# Sink
agent2.sinks.sink2.type = hdfs
agent2.sinks.sink2.hdfs.path = flume/agent_tiers
agent2.sinks.sink2.hdfs.fileType = DataStream
agent2.sinks.sink2.hdfs.filePrefix = events
agent2.sinks.sink2.hdfs.fileSuffix = .log
agent2.sinks.sink2.channel = channel2
# Channel
agent2.channels.channel2.type = memory