当前位置: 首页 > 工具软件 > collect > 使用案例 >

python gc模块_Python gc.collect方法代码示例

高吉星
2023-12-01

本文整理汇总了Python中gc.collect方法的典型用法代码示例。如果您正苦于以下问题:Python gc.collect方法的具体用法?Python gc.collect怎么用?Python gc.collect使用的例子?那么恭喜您, 这里精选的方法代码示例或许可以为您提供帮助。您也可以进一步了解该方法所在模块gc的用法示例。

在下文中一共展示了gc.collect方法的29个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: loadW2V

​点赞 6

# 需要导入模块: import gc [as 别名]

# 或者: from gc import collect [as 别名]

def loadW2V(self,emb_path, type="bin"):

print("Loading W2V data...")

num_keys = 0

if type=="textgz":

# this seems faster than gensim non-binary load

for line in gzip.open(emb_path):

l = line.strip().split()

st=l[0].lower()

self.pre_emb[st]=np.asarray(l[1:])

num_keys=len(self.pre_emb)

if type=="text":

# this seems faster than gensim non-binary load

for line in open(emb_path):

l = line.strip().split()

st=l[0].lower()

self.pre_emb[st]=np.asarray(l[1:])

num_keys=len(self.pre_emb)

else:

self.pre_emb = Word2Vec.load_word2vec_format(emb_path,binary=True)

self.pre_emb.init_sims(replace=True)

num_keys=len(self.pre_emb.vocab)

print("loaded word2vec len ", num_keys)

gc.collect()

开发者ID:dhwajraj,项目名称:deep-siamese-text-similarity,代码行数:25,

示例2: test_close_auto_generated_session

​点赞 6

# 需要导入模块: import gc [as 别名]

# 或者: from gc import collect [as 别名]

def test_close_auto_generated_session(self, mocker):

# Setup

import requests

import gc

session_mock = mocker.Mock(spec=requests.Session)

session_mock.request.return_value = "response"

session_cls_mock = mocker.patch("requests.Session")

session_cls_mock.return_value = session_mock

# Run

client = requests_.RequestsClient()

client.send(("method", "url", {}))

del client

gc.collect()

assert session_mock.close.call_count == 1

开发者ID:prkumar,项目名称:uplink,代码行数:19,

示例3: test_close_auto_created_session

​点赞 6

# 需要导入模块: import gc [as 别名]

# 或者: from gc import collect [as 别名]

def test_close_auto_created_session(self, mocker):

# Setup

import asyncio

import gc

import aiohttp

mock_session = mocker.Mock(spec=aiohttp.ClientSession)

session_cls_mock = mocker.patch("aiohttp.ClientSession")

session_cls_mock.return_value = mock_session

positionals = [1]

keywords = {"keyword": 2}

# Run: Create client

client = aiohttp_.AiohttpClient.create(*positionals, **keywords)

# Run: Get session

loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.ensure_future(client.session()))

# Verify: session created with args

session_cls_mock.assert_called_with(*positionals, **keywords)

del client

gc.collect()

session_cls_mock.return_value.close.assert_called_with()

开发者ID:prkumar,项目名称:uplink,代码行数:27,

示例4: _place_electrodes

​点赞 6

# 需要导入模块: import gc [as 别名]

# 或者: from gc import collect [as 别名]

def _place_electrodes(self, fix_th=True):

""" Add the defined electrodes to a mesh

Parameters:

------------

fn_out: str

name of output file

"""

w_elec = copy.deepcopy(self.mesh)

w_elec.fix_tr_node_ordering()

electrode_surfaces = [None for i in range(len(self.electrode))]

for i, el in enumerate(self.electrode):

logger.info('Placing Electrode:\n{0}'.format(str(el)))

w_elec, n = el.add_electrode_to_mesh(w_elec)

electrode_surfaces[i] = n

w_elec.fix_th_node_ordering()

w_elec.fix_tr_node_ordering()

if fix_th:

logger.info('Improving mesh quality')

w_elec.fix_thin_tetrahedra()

gc.collect()

return w_elec, electrode_surfaces

开发者ID:simnibs,项目名称:simnibs,代码行数:26,

示例5: fix_tr_node_ordering

​点赞 6

# 需要导入模块: import gc [as 别名]

# 或者: from gc import collect [as 别名]

def fix_tr_node_ordering(self):

''' Fixes the node ordering of the triangles in-place '''

corresponding = self.find_corresponding_tetrahedra()

triangles = np.where(self.elm.elm_type == 2)[0]

triangles = triangles[corresponding != -1]

corresponding = corresponding[corresponding != -1]

normals = self.triangle_normals().value[triangles]

baricenters = self.elements_baricenters().value

pos_bar = baricenters[corresponding - 1] - baricenters[triangles]

dotp = np.einsum('ij, ij -> i', normals, pos_bar)

switch = triangles[dotp > 0]

tmp = np.copy(self.elm.node_number_list[switch, 1])

self.elm.node_number_list[switch, 1] = self.elm.node_number_list[switch, 0]

self.elm.node_number_list[switch, 0] = tmp

del tmp

gc.collect()

开发者ID:simnibs,项目名称:simnibs,代码行数:22,

示例6: unload_group

​点赞 6

# 需要导入模块: import gc [as 别名]

# 或者: from gc import collect [as 别名]

def unload_group(chat_id):

global gcache, gc_counter

try:

with open("markov/chat_" + str(chat_id) + ".dat", "wb") as f:

pickle.dump(groups[chat_id], f)

groups[chat_id] = None

del groups[chat_id]

gcache.remove(chat_id)

gc_counter -= 1

if gc_counter < 1:

gc_counter = gc_every_unload

gc.collect()

except KeyboardInterrupt as e:

raise e

except:

pass

开发者ID:39bit,项目名称:Markov_Bot,代码行数:18,

示例7: _wrapper

​点赞 6

# 需要导入模块: import gc [as 别名]

# 或者: from gc import collect [as 别名]

def _wrapper(method, check_steps_end):

if hasattr(method, '_already_decorated') and method._already_decorated:

return method

@wraps(method)

def wrapped(*args, **kwargs):

MolerTest._steps_start()

caught_exception = None

try:

result = method(*args, **kwargs)

except Exception as exc:

caught_exception = exc

finally:

MolerTest._check_exceptions_occured(caught_exception)

if check_steps_end:

MolerTest._check_steps_end()

gc.collect()

return result

wrapped._already_decorated = True

return wrapped

开发者ID:nokia,项目名称:moler,代码行数:23,

示例8: test_subscription_doesnt_block_subscriber_to_be_garbage_collected

​点赞 6

# 需要导入模块: import gc [as 别名]

# 或者: from gc import collect [as 别名]

def test_subscription_doesnt_block_subscriber_to_be_garbage_collected():

from moler.publisher import Publisher

notifier = Publisher()

garbage_collected_subscribers = []

class GcSubscriber(object):

def __del__(self):

garbage_collected_subscribers.append('Subscriber')

subscr = GcSubscriber()

notifier.subscribe(subscr)

del subscr

gc.collect()

assert 'Subscriber' in garbage_collected_subscribers

开发者ID:nokia,项目名称:moler,代码行数:19,

示例9: test_garbage_collected_subscriber_is_not_notified

​点赞 6

# 需要导入模块: import gc [as 别名]

# 或者: from gc import collect [as 别名]

def test_garbage_collected_subscriber_is_not_notified():

from moler.publisher import Publisher

notifier = Publisher()

received_data = []

class GcSubscriber(object):

def __call__(self, data):

received_data.append(data)

subscr1 = GcSubscriber()

subscr2 = GcSubscriber()

notifier.subscribe(subscriber=subscr1)

notifier.subscribe(subscriber=subscr2)

del subscr1

gc.collect()

notifier.notify_subscribers("data")

assert len(received_data) == 1

开发者ID:nokia,项目名称:moler,代码行数:22,

示例10: test_subscription_doesnt_block_subscriber_to_be_garbage_collected

​点赞 6

# 需要导入模块: import gc [as 别名]

# 或者: from gc import collect [as 别名]

def test_subscription_doesnt_block_subscriber_to_be_garbage_collected():

from moler.threaded_moler_connection import ThreadedMolerConnection

moler_conn = ThreadedMolerConnection()

garbage_collected_subscribers = []

class Subscriber(object):

def __del__(self):

garbage_collected_subscribers.append('Subscriber')

class CloseSubscriber(object):

pass

subscr = Subscriber()

close_subscr = CloseSubscriber()

moler_conn.subscribe(subscr, close_subscr)

del subscr

gc.collect()

assert 'Subscriber' in garbage_collected_subscribers

开发者ID:nokia,项目名称:moler,代码行数:23,

示例11: test_garbage_collected_subscriber_is_not_notified

​点赞 6

# 需要导入模块: import gc [as 别名]

# 或者: from gc import collect [as 别名]

def test_garbage_collected_subscriber_is_not_notified():

from moler.threaded_moler_connection import ThreadedMolerConnection

moler_conn = ThreadedMolerConnection()

received_data = []

class Subscriber(object):

def __call__(self, data, time_recv):

received_data.append(data)

subscr1 = Subscriber()

subscr2 = Subscriber()

moler_conn.subscribe(observer=subscr1, connection_closed_handler=do_nothing_func)

moler_conn.subscribe(observer=subscr2, connection_closed_handler=do_nothing_func)

del subscr1

gc.collect()

moler_conn.data_received("data", datetime.datetime.now())

MolerTest.sleep(1, True) # Processing in separate thread so have to wait.

assert len(received_data) == 1

# --------------------------- resources ---------------------------

开发者ID:nokia,项目名称:moler,代码行数:25,

示例12: test_structured_object_indexing

​点赞 6

# 需要导入模块: import gc [as 别名]

# 或者: from gc import collect [as 别名]

def test_structured_object_indexing(self, shape, index, items_changed,

dt, pat, count, singleton):

"""Structured object reference counting for advanced indexing."""

zero = 0

one = 1

arr = np.zeros(shape, dt)

gc.collect()

before_zero = sys.getrefcount(zero)

before_one = sys.getrefcount(one)

# Test item getting:

part = arr[index]

after_zero = sys.getrefcount(zero)

assert after_zero - before_zero == count * items_changed

del part

# Test item setting:

arr[index] = one

gc.collect()

after_zero = sys.getrefcount(zero)

after_one = sys.getrefcount(one)

assert before_zero - after_zero == count * items_changed

assert after_one - before_one == count * items_changed

开发者ID:Frank-qlu,项目名称:recruit,代码行数:25,

示例13: test_structured_object_take_and_repeat

​点赞 6

# 需要导入模块: import gc [as 别名]

# 或者: from gc import collect [as 别名]

def test_structured_object_take_and_repeat(self, dt, pat, count, singleton):

"""Structured object reference counting for specialized functions.

The older functions such as take and repeat use different code paths

then item setting (when writing this).

"""

indices = [0, 1]

arr = np.array([pat] * 3, dt)

gc.collect()

before = sys.getrefcount(singleton)

res = arr.take(indices)

after = sys.getrefcount(singleton)

assert after - before == count * 2

new = res.repeat(10)

gc.collect()

after_repeat = sys.getrefcount(singleton)

assert after_repeat - after == count * 2 * 10

开发者ID:Frank-qlu,项目名称:recruit,代码行数:19,

示例14: test_leak_in_structured_dtype_comparison

​点赞 6

# 需要导入模块: import gc [as 别名]

# 或者: from gc import collect [as 别名]

def test_leak_in_structured_dtype_comparison(self):

# gh-6250

recordtype = np.dtype([('a', np.float64),

('b', np.int32),

('d', (str, 5))])

# Simple case

a = np.zeros(2, dtype=recordtype)

for i in range(100):

a == a

assert_(sys.getrefcount(a) < 10)

# The case in the bug report.

before = sys.getrefcount(a)

u, v = a[0], a[1]

u == v

del u, v

gc.collect()

after = sys.getrefcount(a)

assert_equal(before, after)

开发者ID:Frank-qlu,项目名称:recruit,代码行数:22,

示例15: test_env_dirty

​点赞 6

# 需要导入模块: import gc [as 别名]

# 或者: from gc import collect [as 别名]

def test_env_dirty(self):

self.odoo.config['auto_commit'] = False

def test_record_garbarge_collected():

user_ids = self.odoo.env['res.users'].search([('id', '!=', 1)])

user = self.user_obj.browse(user_ids[0])

self.assertNotIn(user, self.odoo.env.dirty)

self.assertNotIn(user, user.env.dirty)

user.name = "Joe"

self.assertIn(user, self.odoo.env.dirty)

self.assertIn(user, user.env.dirty)

test_record_garbarge_collected()

# Ensure the record has been garbage collected for the next test

import gc

gc.collect()

self.assertEqual(list(self.odoo.env.dirty), [])

开发者ID:OCA,项目名称:odoorpc,代码行数:20,

示例16: clear

​点赞 6

# 需要导入模块: import gc [as 别名]

# 或者: from gc import collect [as 别名]

def clear(self):

self.user = None

self._users = weakref.WeakValueDictionary()

self._emojis = {}

self._calls = {}

self._guilds = {}

self._voice_clients = {}

# LRU of max size 128

self._private_channels = OrderedDict()

# extra dict to look up private channels by user id

self._private_channels_by_user = {}

self._messages = self.max_messages and deque(maxlen=self.max_messages)

# In cases of large deallocations the GC should be called explicitly

# To free the memory more immediately, especially true when it comes

# to reconnect loops which cause mass allocations and deallocations.

gc.collect()

开发者ID:Rapptz,项目名称:discord.py,代码行数:20,

示例17: test_leaks

​点赞 6

# 需要导入模块: import gc [as 别名]

# 或者: from gc import collect [as 别名]

def test_leaks(self):

def _get_items():

items, stops = [], []

for creator in _CREATORS:

item, stop = creator()

items.append(item)

stops.append(stop)

[stop() for stop in stops]

return weakref.WeakSet(items)

items = _get_items()

_pause()

# If this next line were uncommented, it would work without all the

# weakrefs in the code.

# gc.collect()

self.assertEqual(list(items), [])

开发者ID:ManiacalLabs,项目名称:BiblioPixel,代码行数:18,

示例18: test_unclosed_client_session_issue_645_in_async_mode

​点赞 6

# 需要导入模块: import gc [as 别名]

# 或者: from gc import collect [as 别名]

def test_unclosed_client_session_issue_645_in_async_mode(self):

def exception_handler(_, context):

nonlocal session_unclosed

if context["message"] == "Unclosed client session":

session_unclosed = True

async def issue_645():

client = WebClient(base_url="http://localhost:8888", timeout=1, run_async=True)

try:

await client.users_list(token="xoxb-timeout")

except asyncio.TimeoutError:

pass

session_unclosed = False

loop = asyncio.get_event_loop()

loop.set_exception_handler(exception_handler)

loop.run_until_complete(issue_645())

gc.collect() # force Python to gc unclosed client session

self.assertFalse(session_unclosed, "Unclosed client session")

开发者ID:slackapi,项目名称:python-slackclient,代码行数:21,

示例19: test_write_del

​点赞 6

# 需要导入模块: import gc [as 别名]

# 或者: from gc import collect [as 别名]

def test_write_del():

for i in range(N):

pids0 = get_ffmpeg_pids()

w = imageio_ffmpeg.write_frames(test_file2, (64, 64))

pids1 = get_ffmpeg_pids().difference(pids0) # generator has not started

w.send(None)

w.send(b"x" * 64 * 64 * 3)

pids2 = get_ffmpeg_pids().difference(pids0) # now ffmpeg is running

del w

gc.collect()

pids3 = get_ffmpeg_pids().difference(pids0) # now its not

assert len(pids1) == 0

assert len(pids2) == 1

assert len(pids3) == 0

开发者ID:imageio,项目名称:imageio-ffmpeg,代码行数:18,

示例20: test_threading

​点赞 6

# 需要导入模块: import gc [as 别名]

# 或者: from gc import collect [as 别名]

def test_threading():

# See issue #20

num_threads = 16

num_frames = 5

def make_iterator(q, n):

for i in range(n):

gen = imageio_ffmpeg.read_frames(test_file1)

gen.__next__() # meta data

q.put(gen.__next__()) # first frame

q = queue.Queue()

threads = []

for i in range(num_threads):

t = threading.Thread(target=make_iterator, args=(q, num_frames))

t.daemon = True

t.start()

threads.append(t)

for i in range(num_threads * num_frames):

print(i, end=" ")

q.get()

gc.collect() # this seems to help invoke the segfault earlier

开发者ID:imageio,项目名称:imageio-ffmpeg,代码行数:26,

示例21: __getitem__

​点赞 6

# 需要导入模块: import gc [as 别名]

# 或者: from gc import collect [as 别名]

def __getitem__(self, item):

if isinstance(item, (tuple, list)):

time_args = item[0]

filters = item[1]

else:

time_args = item

filters = None

if isinstance(time_args, slice):

return self.collect(

time_args.start, time_args.stop, filters=filters,

)

elif isinstance(time_args, (datetime, str)):

filename = self.find_closest(time_args, filters=filters)

if filename is None:

return None

return self.read(filename)

开发者ID:atmtools,项目名称:typhon,代码行数:20,

示例22: deletePreEmb

​点赞 5

# 需要导入模块: import gc [as 别名]

# 或者: from gc import collect [as 别名]

def deletePreEmb(self):

self.pre_emb=dict()

gc.collect()

开发者ID:dhwajraj,项目名称:deep-siamese-text-similarity,代码行数:5,

示例23: getDataSets

​点赞 5

# 需要导入模块: import gc [as 别名]

# 或者: from gc import collect [as 别名]

def getDataSets(self, training_paths, max_document_length, percent_dev, batch_size, is_char_based):

if is_char_based:

x1_text, x2_text, y=self.getTsvDataCharBased(training_paths)

else:

x1_text, x2_text, y=self.getTsvData(training_paths)

# Build vocabulary

print("Building vocabulary")

vocab_processor = MyVocabularyProcessor(max_document_length,min_frequency=0,is_char_based=is_char_based)

vocab_processor.fit_transform(np.concatenate((x2_text,x1_text),axis=0))

print("Length of loaded vocabulary ={}".format( len(vocab_processor.vocabulary_)))

i1=0

train_set=[]

dev_set=[]

sum_no_of_batches = 0

x1 = np.asarray(list(vocab_processor.transform(x1_text)))

x2 = np.asarray(list(vocab_processor.transform(x2_text)))

# Randomly shuffle data

np.random.seed(131)

shuffle_indices = np.random.permutation(np.arange(len(y)))

x1_shuffled = x1[shuffle_indices]

x2_shuffled = x2[shuffle_indices]

y_shuffled = y[shuffle_indices]

dev_idx = -1*len(y_shuffled)*percent_dev//100

del x1

del x2

# Split train/test set

self.dumpValidation(x1_text,x2_text,y,shuffle_indices,dev_idx,0)

# TODO: This is very crude, should use cross-validation

x1_train, x1_dev = x1_shuffled[:dev_idx], x1_shuffled[dev_idx:]

x2_train, x2_dev = x2_shuffled[:dev_idx], x2_shuffled[dev_idx:]

y_train, y_dev = y_shuffled[:dev_idx], y_shuffled[dev_idx:]

print("Train/Dev split for {}: {:d}/{:d}".format(training_paths, len(y_train), len(y_dev)))

sum_no_of_batches = sum_no_of_batches+(len(y_train)//batch_size)

train_set=(x1_train,x2_train,y_train)

dev_set=(x1_dev,x2_dev,y_dev)

gc.collect()

return train_set,dev_set,vocab_processor,sum_no_of_batches

开发者ID:dhwajraj,项目名称:deep-siamese-text-similarity,代码行数:39,

示例24: getTestDataSet

​点赞 5

# 需要导入模块: import gc [as 别名]

# 或者: from gc import collect [as 别名]

def getTestDataSet(self, data_path, vocab_path, max_document_length):

x1_temp,x2_temp,y = self.getTsvTestData(data_path)

# Build vocabulary

vocab_processor = MyVocabularyProcessor(max_document_length,min_frequency=0)

vocab_processor = vocab_processor.restore(vocab_path)

print len(vocab_processor.vocabulary_)

x1 = np.asarray(list(vocab_processor.transform(x1_temp)))

x2 = np.asarray(list(vocab_processor.transform(x2_temp)))

# Randomly shuffle data

del vocab_processor

gc.collect()

return x1,x2, y

开发者ID:dhwajraj,项目名称:deep-siamese-text-similarity,代码行数:16,

示例25: __iter__

​点赞 5

# 需要导入模块: import gc [as 别名]

# 或者: from gc import collect [as 别名]

def __iter__(self):

for fn in self.data_files:

fn = os.path.join(self.data_folder, fn)

with open(fn, 'rb') as f:

batches = pickle.load(f)

if self.shuffle: random.shuffle(batches) #shuffle data before batch

for batch in batches:

yield batch

del batches

gc.collect()

开发者ID:wengong-jin,项目名称:hgraph2graph,代码行数:14,

示例26: _run_test

​点赞 5

# 需要导入模块: import gc [as 别名]

# 或者: from gc import collect [as 别名]

def _run_test(self, work_func: FunctionType, work_resource: object,

jobs: int, trials: int,

show_progress: bool=False) -> Mapping:

results = {

'jobs': jobs,

'trials': trials,

'time': [],

'blocks': [],

}

# Forcibly evaluate the inputs to prevent time/resources taken up later

inputs = list(zip(

[work_resource] * jobs,

range(jobs)

))

trial_iter = range(trials)

if show_progress is True and trials > 2:

trial_iter = tqdm(trial_iter, desc='trials')

gc.collect()

for _ in trial_iter:

# Run trial of pool map function and measure it

gc.collect()

blocks_start = sys.getallocatedblocks()

time_start = time.time()

list(self.map(work_func, inputs))

time_end = time.time()

results['time'].append(time_end - time_start)

# Get allocated blocks before garbage collection to show peak usage

blocks_end = sys.getallocatedblocks()

results['blocks'].append(blocks_end - blocks_start)

return results

开发者ID:JohnStarich,项目名称:python-pool-performance,代码行数:32,

示例27: two_reservoir_problem

​点赞 5

# 需要导入模块: import gc [as 别名]

# 或者: from gc import collect [as 别名]

def two_reservoir_problem():

filename = os.path.join(TEST_FOLDER, 'models', 'two_reservoir.json')

yield TwoReservoirWrapper(filename)

# Clean up the

clear_global_model_cache()

# We force deallocation the cache here to prevent problems using process pools

# with pytest.

import gc

gc.collect()

开发者ID:pywr,项目名称:pywr,代码行数:11,

示例28: two_reservoir_constrained_problem

​点赞 5

# 需要导入模块: import gc [as 别名]

# 或者: from gc import collect [as 别名]

def two_reservoir_constrained_problem():

filename = os.path.join(TEST_FOLDER, 'models', 'two_reservoir_constrained.json')

yield TwoReservoirWrapper(filename)

# Clean up the

clear_global_model_cache()

# We force deallocation the cache here to prevent problems using process pools

# with pytest.

import gc

gc.collect()

开发者ID:pywr,项目名称:pywr,代码行数:11,

示例29: setup_method

​点赞 5

# 需要导入模块: import gc [as 别名]

# 或者: from gc import collect [as 别名]

def setup_method(self, method):

gc.collect()

开发者ID:myhdl,项目名称:myhdl,代码行数:4,

注:本文中的gc.collect方法示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。

 类似资料: