当前位置: 首页 > 工具软件 > Alluxio > 使用案例 >

ALLuxio

湛宝
2023-12-01

一、什么是Alluxio
Alluxio(之前名为Tachyon)是世界上第一个以内存为中心的虚拟的分布式存储系统。它统一了数据访问的方式,为上层计算框架和底层存储系统构建了桥梁。应用只需要连接Alluxio即可访问存储在底层任意存储系统中的数据。此外,Alluxio的以内存为中心的架构使得数据的访问速度能比现有常规方案快几个数量级。

在大数据生态系统中,Alluxio介于计算框架(如Apache Spark,Apache MapReduce,Apache HBase,Apache Hive,Apache Flink)和现有的存储系统(如Amazon S3,OpenStack Swift,GlusterFS,HDFS,MaprFS,Ceph,NFS,OSS)之间。Alluxio为大数据软件栈带来了显著的性能提升。Alluxio与Hadoop是兼容的。现有的数据分析应用,如Spark和MapReduce程序,可以不修改代码直接在Alluxio上运行。

二、Alluxio应用
比如:分布式内存文件系统Alluxio, Alluxio是一个分布式内存文件系统,可以在集群里以访问内存的速度来访问存在Alluxio里的文件。把Alluxio是架构在最底层的分布式文件存储和上层的各种计算框架之间的一种中间件,其前身为Tachyon。

二、安装Alluxio
docker安装

# Launch the Alluxio master
$ docker run -d \
          -p 19999:19999 \
          --net=alluxio_nw \
          --name=alluxio_master \
          -v ufs:/opt/alluxio/underFSStorage \
          alluxio/alluxio master
# Launch the Alluxio worker
$ docker run -d \
          --net=alluxio_nw \
          --name=alluxio_worker \
          --shm-size=1G -e ALLUXIO_WORKER_MEMORY_SIZE=1G \
          -v ufs:/opt/alluxio/underFSStorage \
          -e ALLUXIO_MASTER_HOSTNAME=alluxio_master \
          alluxio/alluxio worker

三、Alluxio与springboot应用

依赖 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.5.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.citydo</groupId>
    <artifactId>alluxio</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>alluxio</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.alluxio</groupId>
            <artifactId>alluxio-core-client-fs</artifactId>
            <version>1.8.1</version>
        </dependency>


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba.blink</groupId>
            <artifactId>flink-shaded-hadoop2</artifactId>
            <version>1.5.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

控制层 controller

package com.citydo.alluxio.controller;

import alluxio.AlluxioURI;
import alluxio.client.file.FileInStream;
import alluxio.client.file.FileOutStream;
import alluxio.client.file.FileSystem;
import alluxio.exception.AlluxioException;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.io.IOException;


@RestController
@RequestMapping("/v1/alluxio")
public class AlluxioController {


    @RequestMapping("/in")
    public FileInStream in() throws IOException, AlluxioException {
        FileSystem fs = FileSystem.Factory.get();
        AlluxioURI path = new AlluxioURI("/myFile");
        // Open the file for reading
        FileInStream in = fs.openFile(path);
        //CreateFileOptions options = CreateFileOptions.defaults().setBlockSize(128 * Constants.MB);
        //FileOutStream out = fs.createFile(path, options);
        // Read data
        in.read(null);
        // Close file relinquishing the lock
        in.close();
        return  in;
    }


    @RequestMapping("/out")
    public FileOutStream out() throws IOException, AlluxioException {
        FileSystem fs = FileSystem.Factory.get();
        AlluxioURI path = new AlluxioURI("/myFile");
        // Create a file and get its output stream
        FileOutStream out = fs.createFile(path);
        // Write data
        out.write(null);
        // Close and complete file
        out.close();
        return out;
    }

    //参考https://docs.alluxio.io/os/javadoc/stable/index.html
}

工具类 AlluxioFsUitls

package com.citydo.alluxio.utils;

import alluxio.AlluxioURI;
import alluxio.client.ReadType;
import alluxio.client.WriteType;
import alluxio.client.file.FileSystem;
import alluxio.client.file.URIStatus;
import alluxio.client.file.options.CreateFileOptions;
import alluxio.client.file.options.OpenFileOptions;
import alluxio.exception.AlluxioException;

import java.io.*;
import java.util.ArrayList;
import java.util.List;

public class AlluxioFsUitls {
	// 获取文件系统FileSystem
	private static final FileSystem fs = FileSystem.Factory.get();

	/**
	 * 此方法用于添加挂载点
	 *
	 * @param alluxioFilePath
	 *            文件路径
	 */
	public static void mount(String alluxioFilePath, String underFileSystemPath) {
		// 1.创建文件路径 AlluxioURI
		AlluxioURI apath = new AlluxioURI(alluxioFilePath);
		AlluxioURI upath = new AlluxioURI(underFileSystemPath);
		try {
			// 2.添加挂载点
			if (!fs.exists(apath)) {
				fs.mount(apath, upath);
			}
		} catch (IOException e) {
			e.printStackTrace();
		} catch (AlluxioException e) {
			e.printStackTrace();
		}
	}

	/**
	 * 此方法用于删除挂载点
	 *
	 * @param filePath
	 *            文件路径
	 */
	public static void unmount(String filePath) {
		// 1.创建文件路径 AlluxioURI
		AlluxioURI path = new AlluxioURI(filePath);
		try {
			// 2.删除挂载点
			if (fs.exists(path)) {
				fs.unmount(path);
			}
		} catch (IOException e) {
			e.printStackTrace();
		} catch (AlluxioException e) {
			e.printStackTrace();
		}
	}

	/**
	 * 此方法用于创建文件,并向文件中输出内容WriteType.ASYNC_THROUGH
	 * 数据被同步地写入到Alluxio的Worker,并异步地写入到底层存储系统。处于实验阶段。
	 *
	 * @param filePath
	 *            文件路径
	 * @param contents
	 *            向文件中输出的内容
	 * @return 文件创建,是否成功
	 */
	public static boolean createFileMustAsysncThroughWriteTpye(String filePath, List<String> contents) {
		return createFile(filePath, contents, CreateFileOptions.defaults().setWriteType(WriteType.ASYNC_THROUGH));
	}

	/**
	 * 此方法用于创建文件,并向文件中输出内容WriteType.CACHE_THROUGH
	 * 数据被同步地写入到Alluxio的Worker和底层存储系统。
	 *
	 * @param filePath
	 *            文件路径
	 * @param contents
	 *            向文件中输出的内容
	 * @return 文件创建,是否成功
	 */
	public static boolean createFileMustCacheThroughWriteTpye(String filePath, List<String> contents) {
		return createFile(filePath, contents, CreateFileOptions.defaults().setWriteType(WriteType.CACHE_THROUGH));
	}

	/**
	 * 此方法用于创建文件,并向文件中输出内容WriteType.THROUGH
	 * 数据被同步地写入到底层存储系统。但不会被写入到Alluxio的Worker。
	 *
	 * @param filePath
	 *            文件路径
	 * @param contents
	 *            向文件中输出的内容
	 * @return 文件创建,是否成功
	 */
	public static boolean createFileMustThroughWriteTpye(String filePath, List<String> contents) {
		return createFile(filePath, contents, CreateFileOptions.defaults().setWriteType(WriteType.THROUGH));
	}

	/**
	 * 此方法用于创建文件,并向文件中输出内容WriteType.MUST_CACHE
	 * 数据被同步地写入到Alluxio的Worker。但不会被写入到底层存储系统。这是默认写类型。
	 *
	 * @param filePath
	 *            文件路径
	 * @param contents
	 *            向文件中输出的内容
	 * @return 文件创建,是否成功
	 */
	public static boolean createFileMustCacheWriteTpye(String filePath, List<String> contents) {
		return createFile(filePath, contents, CreateFileOptions.defaults().setWriteType(WriteType.MUST_CACHE));
	}

	/**
	 * 方法用于创建文件,并向文件中输出内容
	 *
	 * @param filePath
	 *            文件路径
	 * @param contents
	 *            向文件中输出的内容
	 * @param options
	 *            CreateFileOptions
	 * @return 文件创建,是否成功
	 */
	public static boolean createFile(String filePath, List<String> contents, CreateFileOptions options) {
		// 1.创建文件路径 AlluxioURI
		AlluxioURI path = new AlluxioURI(filePath);
		BufferedWriter writer = null;
		try {
			// 2.打开文件输出流,使用BufferedWriter输出
			if (!fs.exists(path)) {
				writer = new BufferedWriter(new OutputStreamWriter(fs.createFile(path, options)));
				// 3.输出文件内容
				for (String line : contents) {
					writer.write(line);
					writer.newLine();
				}
			}
			// 3.如果文件存在,则表示执行成功
			return fs.exists(path);

		} catch (IOException e) {
			e.printStackTrace();
		} catch (AlluxioException e) {
			e.printStackTrace();
		} finally {
			try {
				// 4.关闭输入流,释放资源
				if (writer != null) {
					writer.close();
				}
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		return false;
	}

	/**
	 * 此方法用于读取alluxio文件ReadType.CACHE_PROMOTE
	 * 如果读取的数据在Worker上时,该数据被移动到Worker的最高层。如果该数据不在本地Worker的Alluxio存储中,
	 * 那么就将一个副本添加到本地Alluxio Worker中,用于每次完整地读取数据块。这是默认的读类型。
	 *
	 * @param filePath
	 *            文件路径
	 * @return 文件的内容
	 */
	public static List<String> openFilePromoteCacheReadType(String filePath) {
		return openFile(filePath, OpenFileOptions.defaults().setReadType(ReadType.CACHE_PROMOTE));
	}

	/**
	 * 此方法用于读取alluxio文件ReadType.NO_CACHE 不会创建副本
	 *
	 * @param filePath
	 *            文件路径
	 * @return 文件的内容
	 */
	public static List<String> openFileNoCacheReadType(String filePath) {
		return openFile(filePath, OpenFileOptions.defaults().setReadType(ReadType.NO_CACHE));
	}

	/**
	 * 此方法用于读取alluxio文件ReadType.CACHE
	 * 如果该数据不在本地Worker的Alluxio存储中,那么就将一个副本添加到本地Alluxio Worker中, 用于每次完整地读取数据块。
	 *
	 * @param filePath
	 *            文件路径
	 * @return 文件的内容
	 */
	public static List<String> openFileCacheReadType(String filePath) {
		return openFile(filePath, OpenFileOptions.defaults().setReadType(ReadType.CACHE));
	}

	/**
	 * 此方法用于读取alluxio文件DefalutReadType
	 *
	 * @param filePath
	 *            文件路径
	 * @return 文件的内容
	 */
	public static List<String> openFileDefalutReadType(String filePath) {
		return openFile(filePath, OpenFileOptions.defaults());
	}

	/**
	 * 此方法用于读取alluxio文件
	 *
	 * @param filePath
	 *            文件路径
	 * @param options
	 *            文件读取选项
	 * @return 文件的内容
	 */
	public static List<String> openFile(String filePath, OpenFileOptions options) {
		List<String> list = new ArrayList<String>();
		// 1.创建文件路径 AlluxioURI
		AlluxioURI path = new AlluxioURI(filePath);
		BufferedReader reader = null;
		try {
			// 2.打开文件输入流,使用 BufferedReader按行读取
			if (fs.exists(path)) {
				reader = new BufferedReader(new InputStreamReader(fs.openFile(path, options)));
				for (String line = null; (line = reader.readLine()) != null;) {
					list.add(line);
				}
				return list;
			}
		} catch (IOException e) {
			e.printStackTrace();
		} catch (AlluxioException e) {
			e.printStackTrace();
		} finally {
			try {
				// 3.关闭输入流,释放资源
				if (reader != null) {
					reader.close();
				}
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		return list;
	}

	/**
	 * 此方法用于释放alluxio中的文件或路径
	 *
	 * @param filePath
	 *            文件路径
	 * @return 释放文件, 是否成功
	 */
	public static boolean free(String filePath) {
		// 1.创建文件路径 AlluxioURI
		AlluxioURI path = new AlluxioURI(filePath);
		try {
			// 2.释放文件
			if (fs.exists(path)) {
				fs.free(path);
			}
			// 3.判定文件是否不存在,如果不存在则删除成功!
			return !fs.exists(path);
		} catch (IOException e) {
			e.printStackTrace();
		} catch (AlluxioException e) {
			e.printStackTrace();
		}
		return false;
	}

	/**
	 * 此方法用于删除文件或路径
	 *
	 * @param filePath
	 *            文件路径
	 * @return 删除文件, 是否成功
	 */
	public static boolean delete(String filePath) {
		// 1.创建文件路径 AlluxioURI
		AlluxioURI path = new AlluxioURI(filePath);
		try {
			// 2.删除文件
			if (fs.exists(path)) {
				fs.delete(path);
			}
			// 3.判定文件是否不存在,如果不存在则删除成功!
			return !fs.exists(path);
		} catch (IOException e) {
			e.printStackTrace();
		} catch (AlluxioException e) {
			e.printStackTrace();
		}
		return false;
	}

	/**
	 * 此方法用于创建文件夹
	 *
	 * @param dirPath
	 *            文件夹路径
	 * @return 创建文件夹, 是否成功
	 */
	public static boolean createDirectory(String dirPath) {
		// 1.创建文件路径 AlluxioURI
		AlluxioURI path = new AlluxioURI(dirPath);
		try {
			// 2.创建文件夹
			if (!fs.exists(path)) {
				fs.createDirectory(path);
			}
			// 3.再次判定文件夹是否存在,来确定文件夹是否创建成功
			return fs.exists(path);
		} catch (IOException e) {
			e.printStackTrace();
		} catch (AlluxioException e) {
			e.printStackTrace();
		}
		return false;
	}

	/**
	 * 此方法用于获取文件状态信息
	 *
	 * @param filePath
	 *            文件路径
	 * @return List<URIStatus>
	 */
	public static List<URIStatus> listStatus(String filePath) {
		// 1.创建文件路径 AlluxioURI
		AlluxioURI path = new AlluxioURI(filePath);
		try {
			// 2.获取文件状态信息
			if (fs.exists(path)) {
				return fs.listStatus(path);
			}
		} catch (IOException e) {
			e.printStackTrace();
		} catch (AlluxioException e) {
			e.printStackTrace();
		}
		return null;
	}

	/**
	 * 此方法用于获取文件状态信息
	 *
	 * @param filePath
	 *            文件路径
	 * @return URIStatus
	 */
	public static URIStatus getStatus(String filePath) {
		// 1.创建文件路径 AlluxioURI
		AlluxioURI path = new AlluxioURI(filePath);
		try {
			// 2.获取文件状态信息
			if (fs.exists(path)) {
				return fs.getStatus(path);
			}
		} catch (IOException e) {
			e.printStackTrace();
		} catch (AlluxioException e) {
			e.printStackTrace();
		}
		return null;
	}

	/**
	 * 此方法用于判定文件是否存在
	 *
	 * @param filePath
	 *            文件路径
	 * @return 文件是否存在
	 */
	public static boolean exists(String filePath) {
		// 1.创建文件路径 AlluxioURI
		AlluxioURI path = new AlluxioURI(filePath);
		try {
			// 2.获取文件状态信息
			return fs.exists(path);
		} catch (IOException e) {
			e.printStackTrace();
		} catch (AlluxioException e) {
			e.printStackTrace();
		}
		return false;
	}

	/**
	 * 此方法用于重命名文件
	 *
	 * @param sourcePath
	 *            原文件路径
	 * @param distPath
	 *            目的文件路径
	 * @return 重命名是否成功
	 */
	public static boolean rename(String sourcePath, String distPath) {
		// 1.创建文件路径 AlluxioURI
		AlluxioURI sourcepath = new AlluxioURI(sourcePath);
		AlluxioURI distpath = new AlluxioURI(distPath);
		try {
			// 2.重命名操作
			if (fs.exists(sourcepath)) {
				fs.rename(sourcepath, distpath);
			}
			// 3.判定目标文件是否存在,来判定重命名是否成功
			return ((fs.exists(distpath)) && (!fs.exists(sourcepath)));
		} catch (IOException e) {
			e.printStackTrace();
		} catch (AlluxioException e) {
			e.printStackTrace();
		}
		return false;
	}
}

四、性能对比
万级表
1.2G (2千万)

presto sqlHDFS(s)Alluxio(s)
count52
distinct + where3~101
group by21

亿级表
tableB 900G (45亿)

presto sqlHDFS(s)Alluxio(s)
count23~6911~14
distinct + where50~9520~21
group by76~15777~90

参考:https://www.alluxio.io/download/
参考:https://blog.csdn.net/lsshlsw/article/details/85690841
参考:https://docs.alluxio.io/os/user/stable/cn/Getting-Started.html

 类似资料:

相关阅读

相关文章

相关问答