当前位置: 首页 > 工具软件 > Apache Ignite > 使用案例 >

spring程序使用三个方式在apache ignite存取数据

翟奇
2023-12-01


本文基于分布式内存数据库ignite的2.12.0版本,展示使用自动序列化、BinaryObjectBuilder、sql三种方式存取数据的demo

创建数据表 (cache)

准备集群,集群server节点需要使用四个端口:
10800 (JDBC/ODBC), 11211 (TCP connector), 47100 (listener), 47500 (discovery)

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:util="http://www.springframework.org/schema/util"
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/util
        http://www.springframework.org/schema/util/spring-util.xsd">
    <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
         <property name="igniteInstanceName" value="test-grid-xx1-yy1"/>
         <property name="userAttributes">
             <map>
                 <entry key="rack" value="xx1"/>
             </map>
         </property>
        <!-- Enable peer class loading. -->
        <property name="peerClassLoadingEnabled" value="true"/>
        <!-- Set deployment mode. -->
        <property name="deploymentMode" value="ISOLATED"/>
        <property name="authenticationEnabled" value="true"/>
        <!-- Enable task execution events for examples. -->
        <property name="includeEventTypes">
            <list>
                <!--Task execution events-->
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/>
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/>
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/>
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/>
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/>
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>

                <!--Cache events-->
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/>
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/>
            </list>
        </property>
        <property name="sqlConfiguration">
            <bean class="org.apache.ignite.configuration.SqlConfiguration">
                <property name="sqlSchemas">
                    <list>
                        <value>DEV</value>
                        <value>SIT</value>
                        <value>UAT</value>
                        <value>VER</value>
                    </list>
                </property>
            </bean>
        </property>
        <property name="cacheConfiguration">
            <list>
                <!-- Partitioned cache for Persons data. -->
                <bean class="org.apache.ignite.configuration.CacheConfiguration">
                    <property name="name" value="test"/>
                    <property name="backups" value="1"/>
                    <property name="cacheMode" value="PARTITIONED"/>
                    <property name="atomicityMode" value="TRANSACTIONAL"/>
                    <!-- Group the cache belongs to. -->
                    <property name="groupName" value="DEV"/>
                </bean>
            </list>
        </property>
        <property name="dataStorageConfiguration">
            <bean class="org.apache.ignite.configuration.DataStorageConfiguration">
                <!--
                    Default memory region that grows endlessly. A cache is bound to this memory region
                    unless it sets another one in its CacheConfiguration.
                -->
                <property name="defaultDataRegionConfiguration">
                    <bean class="org.apache.ignite.configuration.DataRegionConfiguration">
                        <property name="name" value="Default_Region"/>
                        <property name="persistenceEnabled" value="true"/>
                        <!-- 100 MB memory region with disabled eviction -->
                        <property name="initialSize" value="#{100L * 1024 * 1024}"/>
                        <property name="maxSize" value="#{100L * 1024 * 1024 * 1024}"/>
                        <!-- Enabling SEGMENTED_LRU page replacement for this region.  -->
                        <property name="pageReplacementMode" value="SEGMENTED_LRU"/>
                        <property name="metricsEnabled" value="true"/>
                        <property name="warmUpConfiguration">
                            <bean class="org.apache.ignite.configuration.LoadAllWarmUpConfiguration"/>
                        </property>
                    </bean>
                </property>
            </bean>
        </property>
        
        <!--
        Explicitly configure TCP discovery SPI to provide list of
        initial nodes from the first cluster.
        -->
        <property name="discoverySpi">
            <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                <property name="localAddress" value="10.0.0.1"/>
                <property name="localPort" value="47500"/>
                <property name="localPortRange" value="20"/>
                <property name="ackTimeout" value="#{3L * 1000}"/>
                <property name="reconnectDelay" value="2000"/>
                <property name="reconnectCount" value="5"/>                
                <property name="connectionRecoveryTimeout" value="#{60L * 1000}"/>

                <!-- Setting up IP finder for this cluster -->
                <property name="ipFinder">
                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
                        <property name="shared" value="true"/>
                        <property name="addresses">
                            <list>
                                <value>10.0.0.2:47500..47520</value>
                                <value>10.0.0.3:47500..47520</value>
                            </list>
                        </property>
                    </bean>
                </property>
            </bean>
        </property>

        <!--
        Explicitly configure TCP communication SPI changing local
        port number for the nodes from the first cluster.
        -->
        <property name="communicationSpi">
            <bean class="org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi">
                <property name="localPort" value="47100"/>
                <property name="localPortRange" value="20"/>
                <property name="connectTimeout" value="#{5L * 1000}"/>
                <property name="reconnectCount" value="12"/>
                <property name="idleConnectionTimeout" value="#{60L * 1000}"/>
                <property name="usePairedConnections" value="true"/>
                <property name="connectionsPerNode" value="3"/>
            </bean>
        </property>
    </bean>
</beans>

登录到ignite,创建用户

./sqlline.sh --verbose=true -u 'jdbc:ignite:thin://10.0.0.1:10800,10.0.0.2:10800/SYS'
#默认管理账户是ignite:ignite,一般需要创建自定义用户
CREATE USER "wzp" WITH PASSWORD 'wzp';
#查看常用的运维信息:https://ignite.apache.org/docs/latest/monitoring-metrics/system-views
select * from SYS.NODES;
select * from SYS.CACHES;
select * from SYS.SCHEMAS;
select * from SYS.TABLES;
select * from SYS.TABLE_COLUMNS;
select * from SYS.BINARY_METADATA;

使用create table来创建cache,可以避免部署自定义类到server节点并支持使用二级索引;
table对应的cahe的名称为SQL_<SCHEMA_NAME>_<TABLE_NAME>,即:SQL_DEV_PERSON

CREATE TABLE IF NOT EXISTS DEV.PERSON (
  id int,
  city_id int,
  name varchar,
  age int,
  company varchar,
  PRIMARY KEY (id)
) WITH "TEMPLATE=PARTITIONED,CACHE_GROUP=DEV,BACKUPS=1,ATOMICITY=TRANSACTIONAL,VALUE_TYPE=com.wzp.ignite.bo.Person";

初始化spring bean

	@Bean
	public IgniteClient igniteClient() {
		ClientConfiguration cfg = new ClientConfiguration();
		cfg.setAddresses("10.0.0.1:10800", "10.0.0.2:10800").setUserName("wzp").setUserPassword("wzp");
		cfg.setPartitionAwarenessEnabled(false);
		cfg.setTransactionConfiguration(new ClientTransactionConfiguration().setDefaultTxTimeout(10000)
				.setDefaultTxConcurrency(TransactionConcurrency.OPTIMISTIC)
				.setDefaultTxIsolation(TransactionIsolation.REPEATABLE_READ));
		return Ignition.startClient(cfg);
	}

	@Bean("igniteTransactionManager")
	public IgniteClientSpringTransactionManager igniteTransactionManager(IgniteClient cli) {
		IgniteClientSpringTransactionManager mgr = new IgniteClientSpringTransactionManager();
		mgr.setClientInstance(cli);
		return mgr;
	}

使用自动序列化机制 (Key-Value API)

ignite实质是key-value数据库,会自动使用BinaryObject格式把value(POJO对象)进行序列化后存储

package com.wzp.ignite.bo;

@lombok.Data
public class Person implements java.io.Serializable {
	private static final long serialVersionUID = 1L;

	private int id;
	private String name;
	private int age;
	private String company;
	// 即使配置@QuerySqlField也无法支持自动序列化时的驼峰转下划线:cityId->city_id
	private int city_id;
}

只要保证table定义里的VALUE_TYPE和POJO对象的类名一致,那么sql也能查询到使用Key-Value API存入的POJO数据。如果使用复合主键,那么需要设置好table定义里的KEY_TYPE。

	@Autowired
	IgniteClient ignite;

	private void insertObject(int key) {
		ClientCache<Integer, Person> cache = ignite.cache("SQL_" + schemaName + "_" + "PERSON");
		Person person = new Person();
		person.setId(key);
		person.setCity_id(1001);
		person.setAge(1);
		person.setName("Serializable");
		person.setCompany("MIDEA");
		cache.put(key, person);
	}

	private Person getObject(int key) {
		ClientCache<Integer, Person> cache = ignite.cache("SQL_" + schemaName + "_" + "PERSON");
		return cache.get(key);
	}

如需原子性地修改几行数据,在public方法上加上注解:
@Transactional(transactionManager = “igniteTransactionManager”)

使用BinaryObjectBuilder (Key-Value API)

如果程序里不存在Person类的定义,也可以使用BinaryObjectBuilder操作Table表数据;
builder的名称需要与table定义里的VALUE_TYPE一致:

	private void insertBinaryObject(int key) {
		ClientCache<Integer, BinaryObject> binaryCache = ignite.cache("SQL_" + schemaName + "_" + "PERSON").withKeepBinary();
		BinaryObjectBuilder builder = ignite.binary().builder("com.wzp.ignite.bo.Person");
		builder.setField("id", key);
		builder.setField("city_id", 1001);
		builder.setField("name", "BinaryObjectBuilder");
		builder.setField("age", 30);
		builder.setField("company", "MIDEA");
		binaryCache.put(key, builder.build());
	}

	private BinaryObject getBinaryObject(int key) {
		ClientCache<Integer, BinaryObject> binaryCache = ignite.cache(cacheName).withKeepBinary();
		return binaryCache.get(key);
	}
	/**
	 * 用key-value api操作使用复合主键的表
	 */
	private void insertBinaryObject(int userId, String month) {
		ClientCache<BinaryObject, BinaryObject> binaryCache = ignite.cache("SQL_" + schemaName + "_" + "SALLARY")
				.withKeepBinary();
		// 定义表时,指定KEY_TYPE
		BinaryObjectBuilder keyBuilder = ignite.binary().builder("com.wzp.ignite.bo.SallaryKey");
		keyBuilder.setField("user_id", userId);
		keyBuilder.setField("month", month);
		
		BinaryObjectBuilder valueBuilder = ignite.binary().builder("com.wzp.ignite.bo.SallaryValue");
		valueBuilder.setField("salary_before_tax", 100000L);
		valueBuilder.setField("tax", 3L);

		binaryCache.put(keyBuilder.build(), valueBuilder.build());
	}

使用SQL (SQL API)

瘦客户端模式下必须使用SQL API才能在服务端进行数据筛选,并利用到二级索引;
通过setSchema来切换schema

	private void insertBySql(int key) {
		ClientCache<Integer, Person> cache = ignite.cache(cacheName);
		cache.query(new SqlFieldsQuery("INSERT INTO person(id, city_id, name, age, company) VALUES(?, ?, ?, ?, ?)")
				.setArgs(key, 1001, "SQL", 30, "Midea").setSchema(schemaName)).getAll();
	}

	private Person selectBySql(int key) {
		ClientCache<Integer, Person> cache = ignite.cache(cacheName);
		try (FieldsQueryCursor<List<?>> cursor = cache
				.query(new SqlFieldsQuery("select * from person where id = ?").setArgs(key)
						.setSchema(schemaName))) {
			for (List<?> row : cursor) {				
				Map<String,Object> map=new HashMap<>();
				for (int i = 0; i < row.size(); i++) {
				    // 列名作为key
					map.put(cursor.getFieldName(i), row.get(i));
				}
				// 使用jackson进行反序列化
				ObjectMapper mapper = new ObjectMapper();
				mapper.configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true);
				return mapper.convertValue(map, Person.class);
			}
		}
		return null;
	}
 类似资料: