1. 全景图
.
2. 用ListHDFS获取所有文件名
如果想重新再取一次,右健view state:
点击 clear state, 再运行,即可再次采集数据了。
3. 用FetchHDFS 取出json 数据
4. 用ExecuteScript 转换
import org.apache.commons.io.IOUtils
import java.nio.charset.*import java.text.SimpleDateFormatimport groovy.json.*def flowFile = session.get()
flowFile = session.write(flowFile, {inputStream, outputStream ->
def js = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
def data = new JsonSlurper().parseText( js ) def columns = data.data*.keySet().flatten().unique()// Wrap strings in double quotes, and remove nulls
def encode = { e -> e == null ? '' : e instanceof String ? /"$e"/ : "$e" }// Print all the column names
def columnName = columns.collect { c -> encode( c ) }.join( ',' )// Then create all the rows
def columnData = data.data.collect { row -> // A row at a time columns.collect { colName -> encode( row[ colName ] ) }.join( ',' ) }.join( '\n' )StringBuilder cd = new StringBuilder()
cd.append(columnName + "\n") cd.append(columnData)outputStream.write(cd.toString().getBytes(StandardCharsets.UTF_8))
}as StreamCallback)session.transfer(flowFile, REL_SUCCESS)
参考:
5. 用PutHDFS 插入
问题:
最近加了cluster,发现listhdfs不能取到数据了:
查看日志:
发现日志里提到了zookeeper导致connection refused
nifi设置成cluster必须走zookeeper来调度资源,所以必须要连上我们的zookeeper server,有一个配置要加
conf/state-management.xml里面有个配置
<cluster-provider>
<id>zk-provider</id> <class>org.apache.nifi.controller.state.providers.zookeeper.ZooKeeperStateProvider</class> <property name="Connect String">wdp.xxx.cn:2181</property> <property name="Root Node">/nifi</property> <property name="Session Timeout">30 seconds</property> <property name="Access Control">CreatorOnly</property> <property name="Username">nifi</property> <property name="Password">nifi</property> </cluster-provider>
NIFI 中国社区 QQ群:595034369