博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
把HDFS里的json数据转换成csv格式
阅读量:6366 次
发布时间:2019-06-23

本文共 1783 字,大约阅读时间需要 5 分钟。

 

1. 全景图

 NewImage

 .

2. 用ListHDFS获取所有文件名

NewImage

 

如果想重新再取一次,右健view state:

NewImage

 

点击 clear state, 再运行,即可再次采集数据了。

 

3. 用FetchHDFS 取出json 数据

NewImage

 

4. 用ExecuteScript 转换

NewImage

import org.apache.commons.io.IOUtils

import java.nio.charset.*
import java.text.SimpleDateFormat
import groovy.json.*

def flowFile = session.get()

flowFile = session.write(flowFile, {inputStream, outputStream ->

def js = IOUtils.toString(inputStream, StandardCharsets.UTF_8)

def data = new JsonSlurper().parseText( js )
def columns = data.data*.keySet().flatten().unique()

// Wrap strings in double quotes, and remove nulls

def encode = { e -> e == null ? '' : e instanceof String ? /"$e"/ : "$e" }

// Print all the column names

def columnName = columns.collect { c -> encode( c ) }.join( ',' )

// Then create all the rows

def columnData = data.data.collect { row ->
// A row at a time
columns.collect { colName -> encode( row[ colName ] ) }.join( ',' )
}.join( '\n' )

StringBuilder cd = new StringBuilder()

cd.append(columnName + "\n")
cd.append(columnData)

outputStream.write(cd.toString().getBytes(StandardCharsets.UTF_8))

}as StreamCallback)

session.transfer(flowFile, REL_SUCCESS)

 

参考:

 

5. 用PutHDFS 插入

NewImage

 

问题:

最近加了cluster,发现listhdfs不能取到数据了:

NewImage

 

查看日志:

NewImage

 

发现日志里提到了zookeeper导致connection refused

 

nifi设置成cluster必须走zookeeper来调度资源,所以必须要连上我们的zookeeper server,有一个配置要加

conf/state-management.xml里面有个配置

<cluster-provider>

<id>zk-provider</id>
<class>org.apache.nifi.controller.state.providers.zookeeper.ZooKeeperStateProvider</class>
<property name="Connect String">wdp.xxx.cn:2181</property>
<property name="Root Node">/nifi</property>
<property name="Session Timeout">30 seconds</property>
<property name="Access Control">CreatorOnly</property>
<property name="Username">nifi</property>
<property name="Password">nifi</property>
</cluster-provider>

 

 

NIFI 中国社区 QQ群:595034369

 

 

转载地址:http://ebrma.baihongyu.com/

你可能感兴趣的文章
Flutter组件学习(四)—— 布局组件Row和Column
查看>>
Runtime在实际开发中的应用
查看>>
优化技巧二、OC开发中常用的tips
查看>>
Android视频开发进阶(part3-Android的Media API)
查看>>
PHP 性能追踪及分析工具(XHPROF)
查看>>
函数式编程之Compose
查看>>
Angular 5 来了!
查看>>
语义化版本控制模块-Semver
查看>>
UML 剖析(2) —— 类图关联和时序图
查看>>
阿里云上到底能运行SAP哪些产品?
查看>>
对不起!冷到你!
查看>>
js 两种常用的类型
查看>>
iOS 用RunTime重写KVO&lt;附Demo&gt;
查看>>
Java Socket 之 TCP Socket
查看>>
作为过来人,我想告诉准程序员的那些事
查看>>
jmeter压力测试,案例实讲
查看>>
CSS盒模型
查看>>
ES6学习笔记之let和const
查看>>
源码的魅力 - HashMap 的工作原理
查看>>
使用travis-ci自动部署Hexo到github和coding
查看>>