java kafka写入数据到HDFS问题
作者:我是女孩
这篇文章主要介绍了java kafka写入数据到HDFS问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
java kafka写入数据到HDFS
安装kafka,见我以前的文章
https://www.jb51.net/server/2968144y7.htm
向Hdfs写入文件,控制台会输出以下错误信息:
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=s00356746, access=WRITE, inode="/user":root:supergroup:drwxr-xr-x
从中很容易看出是因为当前执行Spark Application的用户没有Hdfs“/user”目录的写入权限。这个问题无论是在Windows下还是Linux下提交Spark Application都经常会遇到
如果是欧拉操作系统
需做如下处理
chattr -i etc/passwd chattr -i /etc/shadow chattr -i /etc/group chattr -i /etc/passwd- chattr -i /etc/shadow- chattr -i /etc/group- lsattr passwd* 都需要没有 i 属性
如果是Linux环境
将执行操作的用户添加到supergroup用户组。
groupadd supergroup usermod -a -G supergroup s00356746
如果是Windows用户
在hdfs namenode所在机器添加新用户,用户名为执行操作的Windows用户名,然后将此用户添加到supergroup用户组。
adduser s00356746 groupadd supergroup usermod -a -G supergroup s00356746
这样,以后每次执行类似操作可以将文件写入Hdfs中属于s00356746用户的目录内,而不会出现上面的Exception。
生产者代码
import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class KafkaProducer { private final Producer<String, String> producer; public final static String TOPIC = "test"; private KafkaProducer(){ Properties props = new Properties(); //此处配置的是kafka的端口 props.put("metadata.broker.list", "10.175.118.105:9092"); //配置value的序列化类 props.put("serializer.class", "kafka.serializer.StringEncoder"); //配置key的序列化类 props.put("key.serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks","-1"); producer = new Producer<String, String>(new ProducerConfig(props)); } void produce() { int messageNo = 1000; final int COUNT = 10000; while (messageNo < COUNT) { String key = String.valueOf(messageNo); String data = "hello kafka message " + key; producer.send(new KeyedMessage<String, String>(TOPIC, key ,data)); System.out.println(data); messageNo ++; } } public static void main( String[] args ) { new KafkaProducer().produce(); } }
kafka写入Hdfs
package com.huawei.hwclouds.dbs.ops.huatuo.diagnosis.service.impl; import com.huawei.hwclouds.dbs.common.exception.DBSErrorCode; import com.huawei.hwclouds.dbs.common.exception.DBSException; import com.huawei.hwclouds.dbs.constants.VolumeIoType; import com.huawei.hwclouds.dbs.coremodel.model.dto.DBSInstanceDto; import com.huawei.hwclouds.dbs.coremodel.model.dto.DBSNodeDto; import com.huawei.hwclouds.dbs.coremodel.resource.dto.DBSResourceSpecDto; import com.huawei.hwclouds.dbs.coremodel.resource.dto.DBSVolumeDto; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.stream.Collectors; public class KafkaToHdfs extends Thread { private static String kafkaHost = null; private static String kafkaGroup = null; private static String kafkaTopic = null; private static String hdfsUri = null; private static String hdfsDir = null; private static String hadoopUser = null; private static Boolean isDebug = false; private ConsumerConnector consumer = null; private static Configuration hdfsConf = null; private static FileSystem hadoopFS = null; public static void main(String[] args) { // if (args.length < 6) { // useage(); // System.exit(0); // } // Map<String, String> user = new HashMap<String, String>(); // user = System.getenv(); // user.put("HADOOP_USER_NAME","hadoop"); // if (user.get("HADOOP_USER_NAME") == null) { // System.out.println("请设定hadoop的启动的用户名,环境变量名称:HADOOP_USER_NAME,对应的值是hadoop的启动的用户名"); // System.exit(0); // } else { // hadoopUser = user.get("HADOOP_USER_NAME"); // } hadoopUser = "hadoop"; init(args); System.out.println("开始启动服务..."); hdfsConf = new Configuration(); try { hdfsConf.set("fs.defaultFS", hdfsUri); hdfsConf.set("dfs.support.append", "true"); hdfsConf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER"); hdfsConf.set("dfs.client.block.write.replace-datanode-on-failure.enable", "true"); } catch (Exception e) { System.out.println(e); } //创建好相应的目录 try { hadoopFS = FileSystem.get(hdfsConf); //如果hdfs的对应的目录不存在,则进行创建 if (!hadoopFS.exists(new Path("/" + hdfsDir))) { hadoopFS.mkdirs(new Path("/" + hdfsDir)); } hadoopFS.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } KafkaToHdfs selfObj = new KafkaToHdfs(); selfObj.start(); System.out.println("服务启动完毕,监听执行中"); } public void run() { Properties props = new Properties(); props.put("zookeeper.connect", kafkaHost); props.put("group.id", kafkaGroup); props.put("zookeeper.session.timeout.ms", "10000"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "smallest"); props.put("format", "binary"); props.put("auto.commit.enable", "true"); props.put("serializer.class", "kafka.serializer.StringEncoder"); ConsumerConfig consumerConfig = new ConsumerConfig(props); this.consumer = Consumer.createJavaConsumerConnector(consumerConfig); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(kafkaTopic, new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = consumerMap.get(kafkaTopic).get(0); ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()) { String tmp = new String(it.next().message()); String fileContent = null; if (!tmp.endsWith("\n")) fileContent = new String(tmp + "\n"); else fileContent = tmp; debug("receive: " + fileContent); try { hadoopFS = FileSystem.get(hdfsConf); String fileName = "/" + hdfsDir + "/" + (new SimpleDateFormat("yyyy-MM-dd").format(Calendar.getInstance().getTime())) + ".txt"; Path dst = new Path(fileName); if (!hadoopFS.exists(dst)) { FSDataOutputStream output = hadoopFS.create(dst); output.close(); } InputStream in = new ByteArrayInputStream(fileContent.getBytes("UTF-8")); OutputStream out = hadoopFS.append(dst); IOUtils.copyBytes(in, out, 4096, true); } catch (IOException e) { e.printStackTrace(); } finally { try { hadoopFS.close(); } catch (IOException e) { e.printStackTrace(); } } } consumer.shutdown(); } private static void init(String[] args) { kafkaHost = "10.175.118.105:2182"; kafkaGroup = "test-consumer-group"; kafkaTopic = "test"; hdfsUri = "hdfs://10.175.118.105:9000"; hdfsDir = "shxsh"; if (args.length > 5) { if (args[5].equals("true")) { isDebug = true; } } debug("初始化服务参数完毕,参数信息如下"); debug("KAFKA_HOST: " + kafkaHost); debug("KAFKA_GROUP: " + kafkaGroup); debug("KAFKA_TOPIC: " + kafkaTopic); debug("HDFS_URI: " + hdfsUri); debug("HDFS_DIRECTORY: " + hdfsDir); debug("HADOOP_USER: " + hadoopUser); debug("IS_DEBUG: " + isDebug); } private static void debug(String str) { if (isDebug) { System.out.println(str); } } private static void useage() { System.out.println("* kafka写入到hdfs的Java工具使用说明 "); System.out.println("# java -cp kafkatohdfs.jar KafkaToHdfs KAFKA_HOST KAFKA_GROUP KAFKA_TOPIC HDFS_URI HDFS_DIRECTORY IS_DEBUG"); System.out.println("* 参数说明:"); System.out.println("* KAFKA_HOST : 代表kafka的主机名或IP:port,例如:namenode:2181,datanode1:2181,datanode2:2181"); System.out.println("* KAFKA_GROUP : 代表kafka的组,例如:test-consumer-group"); System.out.println("* KAFKA_TOPIC : 代表kafka的topic名称 ,例如:usertags"); System.out.println("* HDFS_URI : 代表hdfs链接uri ,例如:hdfs://namenode:9000"); System.out.println("* HDFS_DIRECTORY : 代表hdfs目录名称 ,例如:usertags"); System.out.println("* 可选参数:"); System.out.println("* IS_DEBUG : 代表是否开启调试模式,true是,false否,默认为false"); } }
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。